妙途旅行之年味:舌尖上的遂昌

青白的特其拉酒散发着诱人的香气扑鼻,热腾腾的土猪肉和鲜嫩的庄户豆腐一起在风炉的炭火上沸腾,鲜嫩的冬笋和肥美的土鸡煨出最可口的原汤……在都会待惯了的大家,一贯很牵挂刻钟候乡里过年的含意。

[TOC]

还有1个月就要过年了,喵兔菌为您打开年味之旅,第三站带你去品味舌尖上的遂昌:打麻糍,包长棕,做黄米粿,杀年猪,当然还要唱一曲汤显祖的《牡丹亭》。

Zookeeper客户端Curator使用详解

打麻糍

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,解决了许多Zookeeper客户端非凡底层的细节开发工作,包括连日来重连、反复注册Watcher和NodeExistsException至极等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的原委是相比较好玩的,上面的局地摘抄自《从PAXOS到ZOOKEEPEQX56分布式一致性原理与执行》一书:
Zookeeper最早源点于雅虎的切磋院的多少个探讨小组。在及时,商讨人口发现,在雅虎内部很多大型的系统须要借助三个类似的系统举办分布式协调,然而那一个连串往往存在分布式单点难题。所以雅虎的开发人士就打算开发三个通用的无单点难点的分布式协调框架。在立项初期,考虑到不少门类都以用动物的名字来定名的(例如盛名的Pig项目),雅虎的工程师希望给那个体系也取1个动物的名字。时任探究院的上位数学家Raghu
Ramakrishnan开玩笑说:再如此下来,大家这时候就改成动物园了。此话一出,大家纷纭表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一块儿,雅虎的全套分布式系统看上去就像是一个重型的动物园了,而Zookeeper正好用来拓展分布式环境的调和——于是,Zookeeper的名字因而诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”恐怕”管理者”,不知情是还是不是支付小组有意而为之,小编算计有大概这么命名的由来是验证Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包蕴了多少个包:
curator-framework:对zookeeper的平底api的局地封装
curator-client:提供部分客户端的操作,例如重试策略等
curator-recipes:包裹了部分高级性格,如:Cache事件监听、公投、分布式锁、分布式计数器、分布式Barrier等
Maven正视(使用curator的版本:2.12.0,对应Zookeeper的本子为:3.4.x,如果跨版本会有包容性难题,很有恐怕造成节点操作退步):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

把珍珠米在清水里浸上一天,放入农家特制的笼屉里用大火蒸熟,神速放进石头做的捣臼里,用木制的大锤子,反复用力地夯。经过重重次的捶打之后,籼米变得万分有弹性,香味却会完好地保留下来。打好的粳米团切成一块块,放入事先炒好的黑芝麻粉里滚一滚,蘸上白糖和芝麻粉,趁热来上一块,口齿留香,甜而不腻。

Curator的基本Api

包长粽

制造会话

包长粽然而个技术活,十几张的箬叶要叠放得平整均匀,多头手要端平持稳,铺上薄薄的一层香米,嵌入一根跟着一根的土猪肉,洒上香馥馥的梅干菜,再铺上籼米……箬叶里包的是长情,是一代人的传统和祝福。

1.行使静态工程措施创制客户端

3个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包括八个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

做黄米粿

2.行使Fluent风格的Api创设会话

主导参数变为流式设置,壹个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

年初接近季节,遂昌的乡下人们都会用山中一种特殊的乔木烧成灰,沥取其汁,浸上等香米至青色橙黄,冲净蒸熟,置于石臼中捣成团,切块后趁热将其揉压成扁形或长条形。做好的黄米果黄中透绿,色泽晶莹,清香宜人,软软可口。用肉丝、青菜、冬笋等清炒,更是色香味俱全。

3.创办包含隔离命名空间的对话

为了兑现不同的Zookeeper业务之间的隔断,须要为每种事情分配壹个单身的命名空间(NameSpace),即钦命一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(上边的例子)当客户端钦点了单身命名空间为“/base”,那么该客户端对Zookeeper上的数额节点的操作都以依据该目录进行的。通过安装Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在三个应用共用壹个Zookeeper集群的情景下,那对于落实不同应用之间的相互隔离拾分有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

杀年猪

开行客户端

当创设会话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

进了五月,大多数住户都要杀猪,为过年包饺子、做菜准备肉料,民间谓之“杀年猪”。杀完年猪当然必不可少摆杀猪宴,远亲近邻聚在一齐,吃着热腾腾的猪肉闲话家常,最是温暖热闹但是。

数量节点操作

三层楼火锅

始建数量节点

Zookeeper的节点创设情势:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带种类号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:一时并且带种类号

**开创1个节点,早先内容为空 **

client.create().forPath("path");

瞩目:尽管没有安装节点属性,节点创制方式暗中同意为持久化节点,内容暗中认同为空

制造多少个节点,附带初阶化内容

client.create().forPath("path","init".getBytes());

成立1个节点,钦点成立形式(一时半刻节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创办一个节点,指定创立方式(权且节点),附带开始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创制3个节点,内定创制格局(权且节点),附带伊始化内容,并且自动递归创立父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

其一creatingParentContainersIfNeeded()接口非凡有用,因为相似意况开发人士在开创多少个子节点必须认清它的父节点是或不是存在,假若不设有直接开立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创制全体所需的父节点。

“三层楼”火锅,是过年时期遂昌人家里的经典年菜,火锅的最底层深埋一层遂昌高山萝卜、再一层煎过的农户盐卤豆腐,将现杀的猪五花、新鲜猪血铺在最上层,佐以大葱、生姜等辅料,用风炉滚成一锅香馥馥的年菜。

除去数据节点

删除三个节点

client.delete().forPath("path");

留意,此办法只可以去除叶子节点,否则会抛出格外。

除去1个节点,并且递归删除其全体的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

去除1个节点,强制钦赐版本举行删除

client.delete().withVersion(10086).forPath("path");

删除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是三个保持措施,只要客户端会话有效,那么Curator会在后台持续拓展删减操作,直到删除节点成功。

注意:上边的三个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

唱牡丹亭

读取数据节点数据

读取3个节点的多少内容

client.getData().forPath("path");

小心,此方法返的再次回到值是byte[ ];

读取二个节点的多少内容,同时取拿到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

满意了口腹之欲,也别忘了饱一饱眼福。白墙黑瓦下,下棋弹琴,柔声哼上一段:在梅边落花似雪纷纭绵绵何人人怜,在柳边风吹悬念生生死死随人愿。千年的等候滋味酸酸楚楚三个人怨,牡丹亭上自家怀念日日年年未平息。

履新数据节点数据

履新3个节点的数量内容

client.setData().forPath("path","data".getBytes());

小心:该接口会回到多个Stat实例

履新2个节点的数量内容,强制钦赐版本举行革新

client.setData().withVersion(10086).forPath("path","data".getBytes());

乡野春晚

反省节点是不是留存

client.checkExists().forPath("path");

在意:该措施重回三个Stat实例,用于检查ZNode是或不是留存的操作.
能够调用额外的点子(监控大概后台处理)并在最终调用forPath(
)内定要操作的ZNode

遂昌60多台“乡村春晚”在独家的文化礼堂上演,走过村子的青石阶,迈过最高木门槛,大红的幔布扯开了一出出戏目。你唱一首山歌,作者跳一支自编自导的舞,邻里家常的琐事也能作出三个小品,田间地头的农务活动也得以被编辑进舞蹈里。

收获有个别节点的全部子节点路径

client.getChildren().forPath("path");

留神:该办法的再次来到值为List<String>,得到ZNode的子节点Path列表。
可以调用额外的法门(监控、后台处理大概取得状态watch, background or get
stat) 并在最终调用forPath()钦赐要操作的父ZNode

寻回记念中的那一捧年味,是关于米罐里的几两香瓜子和戏堂里的交头接耳,最怀熟于心的莫过亲朋邻里因为一桌杀猪菜而兴致盎然。只需来挤挤遂昌乡村的春晚和年龄活动,便可一解多年来直接挂念着似“久别重逢还一面还是”的乡愁。

事务

CuratorFramework的实例包蕴inTransaction(
)接口方法,调用此办法开启一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为3个原子操作提交。1个例子如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

如果你的乡土过年有爽口的幽默的,还有深远年味,请留言告知喵兔菌,喵兔菌将有不错礼品奉送。

异步接口

上边提到的创建、删除、更新、读取等方法都以联名的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次来到的结果音讯。BackgroundCallback接口中2个非同儿戏的回调值为Curator伊芙nt,里面富含事件类型、响应吗和节点的详细音信。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

二个异步创制节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不内定executor,那么会暗中认可使用Curator的伊夫ntThread去进行异步处理。

Curator食谱(高级天性)

升迁:首先你不可以不添加curator-recipes器重,下文仅仅对recipes一些表征的应用进行表达和举例,不打算展开源码级其余追究

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重视指示:强烈推荐使用ConnectionStateListener监控连接的图景,当连接景况为LOST,curator-recipes下的享有Api将会失灵恐怕逾期,就算前边全部的例子都没有接纳到ConnectionStateListener。

缓存

Zookeeper原生扶助通过注册沃特cher来进行事件监听,然而开发者须要频仍注册(沃特cher只好单次注册单次使用)。Cache是Curator中对事件监听的包裹,可以用作是对事件监听的地头缓存视图,可以自动为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的变通。

Path Cache

Path Cache用来监督3个ZNode的子节点. 当三个子节点增添, 更新,删除时,
帕特h Cache会改变它的境况, 会包蕴最新的子节点,
子节点的数量和情景,而气象的更变将通过PathChildrenCacheListener通告。

实质上选拔时会涉及到七个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经过上边的构造函数创设帕特h Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想利用cache,必须调用它的start办法,使用完后调用close艺术。
可以设置StartMode来落成运营的情势,

StartMode有上边两种:

  1. NOSportageMAL:平常开端化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache初叶化数据后发送2个PathChildrenCache伊芙nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以追加listener监听缓存的变更。

getCurrentData()格局重回多个List<ChildData>对象,可以遍历所有的子节点。

设置/更新、移除其实是利用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:比方new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将不会缓存节点数据。

注意:以身作则中的Thread.sleep(10)可以注释掉,可是注释后事件监听的触发次数会不全,那或然与PathCache的贯彻原理有关,无法太过频仍的触发事件!

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某二个特定的节点。它事关到上边的多少个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:采纳cache,依旧要调用它的start()主意,使用完后调用close()方法。

getCurrentData()将收获节点当前的场馆,通过它的动静可以拿走当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:演示中的Thread.sleep(10)可以注释,可是注释后事件监听的触发次数会不全,那或然与NodeCache的兑现原理有关,无法太过很多次的触发事件!

注意:NodeCache只好监听一个节点的情景变化。

Tree Cache

Tree
Cache可以监督全体树上的装有节点,类似于PathCache和NodeCache的三结合,主要涉及到上面多少个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的轩然大波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中并未利用Thread.sleep(10),不过事件触发次数也是常规的。

注意:TreeCache在开首化(调用start()艺术)的时候会回调TreeCacheListener实例一个事TreeCache伊芙nt,而回调的TreeCache伊芙nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有只怕导致空指针至极,那里应该主动处理并防止那种情形。

Leader选举

在分布式总结中, leader elections是很关键的三个作用,
那么些选举进程是那样子的: 指派3个历程作为社团者,将义务分发给各节点。
在义务先河前,
哪个节点都不知情哪个人是leader(领导者)大概coordinator(协调者).
当大选算法初始履行后, 每一种节点最终会获取2个唯一的节点作为天职leader.
除此之外,
大选还五日三头会时有爆发在leader意外宕机的状态下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后通过Zab共商落实follower的同步,leader恐怕follower都足以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是负有存活的客户端不间断的轮换做Leader,大同社会。后者是只要大选出Leader,除非有客户端挂掉重新触发公投,否则不会交出领导权。某党?

LeaderLatch

LeaderLatch有八个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

比方运维,LeaderLatch会和其余使用相同latch
path的其余LeaderLatch交涉,然后中间多少个末尾会被推选为leader,可以经过hasLeadership艺术查看LeaderLatch实例是还是不是leader:

leaderLatch.hasLeadership( ); //重回true表达当前实例是leader

恍如JDK的CountDownLatch,
LeaderLatch在伸手成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用close艺术。
若是它是leader,会放出leadership, 其余的插足者将会大选二个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

丰富处理:
LeaderLatch实例能够追加ConnectionStateListener来监听网络连接难题。 当
SUSPENDED 或 LOST 时,
leader不再认为本身依然leader。当LOST后连连重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创立1个。LeaderLatch用户必须考虑导致leadership丢失的连日难点。
强烈推荐你利用ConnectionStateListener。

几个LeaderLatch的采纳例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的倚重方便进行测试,不要求运营真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

先是大家成立了十一个LeaderLatch,运行后它们中的三个会被推选为leader。
因为大选会费用一些时刻,start后并不能够及时就取得leader。
通过hasLeadership翻开自身是或不是是leader, 如果是的话重临true。
可以经过.getLeader().getId()可以拿走当前的leader的ID。
只可以经过close出狱当前的领导权。
await是八个堵塞方法, 尝试获取leader地位,不过未必能上位。

LeaderSelector

LeaderSelector使用的时候根本涉嫌下边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

主旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运营,当实例取得领导权时您的listener的takeLeadership()措施被调用。而takeLeadership()方法唯有领导权被放走时才再次回到。
当你不再选取LeaderSelector实例时,应该调用它的close方法。

丰富处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接意况的更改。如若实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在重新连接成功以前它恐怕不再是leader了。 若是LOST状态出现,
实例不再是leader, takeLeadership方法重临。

重要: 推荐处理格局是当接过SUSPENDED 或
LOST时抛出CancelLeadershipException非常.。那会导致LeaderSelector实例中断并撤回执行takeLeadership方法的分外.。那不行首要,
你必须考虑增加LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了引进的处理逻辑。

上面的一个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership举行职分的分配等等,并且永不回来,即使你想要要此实例一贯是leader的话可以加3个死循环。调用
leaderSelector.autoRequeue();保险在此实例释放领导权之后还大概赢得领导权。
在此处大家使用AtomicInteger来记录此client得到领导权的次数, 它是”fair”,
每种client有同样的机会赢得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

相比较之下可以,LeaderLatch必须调用close()主意才会放出领导权,而对此LeaderSelector,通过LeaderSelectorListener可以对领导权进行控制,
在适度的时候释放领导权,那样各类节点都有只怕获取领导权。从而,LeaderSelector具有更好的左右逢原和可控性,提议有LeaderElection应用场景下优先选择LeaderSelector。

分布式锁

提醒:

1.引进使用ConnectionStateListener监控连接的情状,因为当连接LOST时您不再具有锁

2.分布式的锁全局同步,
那表示任何二个光阴点不会有八个客户端都拥有同样的锁。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同三个客户端在装有锁的还要,可以频仍得到,不会被封堵。
它是由类InterProcessMutex来完成。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()取得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()措施释放锁。 InterProcessMutex 实例可以引用。

Revoking ZooKeeper recipes wiki定义了可商量的吊销机制。
为了废除mutex, 调用上边的措施:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

假定您请求打消当前的锁,
调用attemptRevoke()艺术,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

三回指示:错误处理
还是强烈推荐你采用ConnectionStateListener处理连接情状的变更。
当连接LOST时你不再抱有锁。

第3让大家创建三个仿照的共享能源,
那么些财富期望只好单线程的走访,否则会有出现难点。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场成立二个InterProcessMutexDemo类, 它担负请求锁,
使用财富,释放锁那样二个完好的拜会进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简单,生成十个client, 逐个client重复执行十三次请求锁–访问财富–释放锁的长河。各种client都在单独的线程中。
结果可以看出,锁是随意的被每一个实例排他性的利用。

既然是可采纳的,你可以在二个线程中数十回调用acquire(),在线程拥有锁时它总是回到true。

您不该在七个线程中用同五个InterProcessMutex
你可以在种种线程中都生成多个新的InterProcessMutex实例,它们的path都无异,那样它们可以共享同二个锁。

不足重入共享锁—Shared Lock

以此锁和位置的InterProcessMutex相对而言,就是少了Reentrant的成效,也就代表它无法在同壹个线程中重入。那么些类是InterProcessSemaphoreMutex,使用方法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运行后发觉,有且只有3个client成功赢得第③个锁(第①个acquire()艺术再次回到true),然后它自身过不去在其次个acquire()主意,获取首个锁超时;其余具有的客户端都阻塞在率先个acquire()方法超时并且抛出13分。

如此也就证实了InterProcessSemaphoreMutex贯彻的锁是不足重入的。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。多个读写锁管理一对相关的锁。1个担当读操作,此外一个承担写操作。读操作在写锁没被选择时可同时由八个进度使用,而写锁在采取时不容许读(阻塞)。

此锁是可重入的。多个具有写锁的线程可重入读锁,然则读锁却不或者进来写锁。那也意味着写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是那么些的。

可重入读写锁首要由三个类完结:InterProcessReadWriteLockInterProcessMutex。使用时首先创设多个InterProcessReadWriteLock实例,然后再依据你的必要得到读锁可能写锁,读写锁的花色是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有三种艺术得以操纵semaphore的最大租约数。第1种方法是用户给定path并且钦定最大LeaseSize。第壹种办法用户给定path并且利用SharedCountReader类。如若不应用SharedCountReader,
必须保障拥有实例在多进度中行使相同的(最大)租约数量,否则有只怕出现A进度中的实例持有最大租约数量为10,可是在B进度中全数的最大租约数量为20,此时租约的含义就失效了。

这一次调用acquire()会回来3个租约对象。
客户端必须在finally中close这么些租约对象,否则那一个租约会丢失掉。 不过,
可是,如若客户端session由于某种原因比如crash丢掉,
那么这么些客户端持有的租约会自动close,
那样任何客户端可以持续利用这个租约。 租约还足以因此上面的法门返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

留神你可以一回性请求两个租约,借使Semaphore当前的租约不够,则请求线程会被封堵。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的要紧类包蕴上面多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

先是我们先拿走了多少个租约, 最终大家把它还给了semaphore。
接着请求了二个租约,因为semaphore还有伍个租约,所以恳请可以满足,重临三个租约,还剩五个租约。
然后再请求多少个租约,因为租约不够,堵塞到过期,依旧没能知足,再次来到结果为null(租约不足会阻塞到过期,然后再次来到null,不会再接再厉抛出尤其;假诺不设置超时时间,会一如既往阻塞)。

上面说讲的锁都以一视同仁锁(fair)。 总ZooKeeper的角度看,
各种客户端都根据请求的各种获得锁,不设有非公平的抢占的景况。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是多个锁的容器。 当调用acquire()
全体的锁都会被acquire(),借使请求战败,全体的锁都会被release。
同样调用release时享有的锁都被release(挫折被忽视)。
基本上,它就是组锁的象征,在它上边的伸手释放操作都会传送给它包蕴的享有的锁。

重在涉及多少个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数要求包括的锁的汇集,大概一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建三个InterProcessMultiLock, 包罗1个重入锁和2个非重入锁。
调用acquire()后得以见见线程同时兼有了那三个锁。
调用release()见到那三个锁都被假释了。

说到底再反复一回,
强烈推荐使用ConnectionStateListener监控连接的图景,当连接景况为LOST,锁将会丢掉。

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以兑现1个集群共享的计数器。
只要采纳同一的path就足以拿到最新的计数器值,
那是由ZooKeeper的一致性保障的。Curator有三个计数器,
3个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式int计数器—SharedCount

本条类应用int类型来计数。 主要涉及多少个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
可以为它扩大一个SharedCountListener,当计数器改变时此Listener可以监听到改变的风云,而SharedCountReader能够读取到新型的值,
包蕴字面值和带版本信息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在那个例子中,我们运用baseCount来监听计数值(addListener措施来添加SharedCountListener
)。 任意的SharedCount, 只要利用同样的path,都足以博得这一个计数值。
然后我们使用五个线程为计数值扩大三个10以内的随机数。相同的path的SharedCount对计数值举行转移,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此地大家采纳trySetCount去设置计数器。
第③个参数提供当前的VersionedValue,假如中间其余client更新了此计数值,
你的更新或者不成事,
然则此时你的client更新了前卫的值,所以失败了您可以尝尝再更新三回。
setCount是挟持更新计数器的值

注意计数器必须start,使用完之后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式long计数器—DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的限制比SharedCount大了之外,
它首先尝试接纳乐观锁的格局设置计数器,
如若不成事(比如时期计数器已经被其余client更新了),
它应用InterProcessMutex方法来更新计数值。

可以从它的里边贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一名目繁多的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 扩张一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须自作者批评重回结果的succeeded(), 它象征此操作是或不是成功。
假使操作成功, preValue()意味着操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

采用Curator也得以简化Ephemeral Node
(权且节点)的操作。Curator也提供ZK Recipe的分布式队列达成。 利用ZK的
PEOdysseySISTENTS_EQUENTIAL节点,
能够保险放入到行列中的项目是依据顺序排队的。
假如纯粹的顾客从队列中取数据, 那么它是先入先出的,那也是队列的性状。
假如你严峻须要顺序,你就的采纳单一的顾客,可以使用Leader选举只让Leader作为唯一的主顾。

而是, 依照Netflix的Curator小编所说,
ZooKeeper真心不吻合做Queue,可能说ZK没有落到实处三个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传导限制。
    实践中ZNode必须相对较小,而队列包括众多的新闻,卓殊的大。
  2. 一旦有比比皆是节点,ZK运营时极度的慢。 而使用queue会导致不可计数ZNode.
    你需求鲜明增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创制了2个专门的次第做这事。
  4. 当很大方的含有众多的子节点的ZNode时, ZK的性质变得不好
  5. ZK的数据库完全放在内存中。 多量的Queue意味着会占据很多的内存空间。

虽说, Curator如故创造了种种Queue的落到实处。
如若Queue的数据量不太多,数据量不太大的图景下,酌情考虑,还是能动用的。

分布式队列—DistributedQueue

DistributedQueue是最平凡的一种队列。 它安插以下八个类:

  • QueueBuilder – 创制队列使用QueueBuilder,它也是其它队列的创制类
  • QueueConsumer – 队列中的音讯消费者接口
  • QueueSerializer –
    队列消息种类化和反种类化接口,提供了对队列中的对象的连串化和反系列化
  • DistributedQueue – 队列完成类

QueueConsumer是消费者,它能够接收队列的数目。处理队列中的数据的代码逻辑可以置身QueueConsumer.consumeMessage()中。

正规景况下先将音讯从队列中移除,再付诸消费者消费。但那是多少个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当顾客消费数据时具有锁,那样任何消费者不只怕消费此音信。假如消费退步或许经过死掉,音信可以交给其他进程。那会带动一些质量的损失。最好大概单消费者形式接纳队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了五个分布式队列和多少个顾客,因为PATH是同等的,会设有消费者抢占消费音讯的景观。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和下边的队列类似,唯独足以为队列中的每多少个要素设置三个ID
可以由此ID把队列中自由的成分移除。 它涉及多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

由此上边方法成立:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在那个事例中,
某些成分还从未被消费者消费前就移除了,这样顾客不会吸收删除的新闻。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

优先级分布式队列—DistributedPriorityQueue

先期级队列对队列中的成分依照优先级举行排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它涉及上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

经过builder.buildPriorityQueue(minItemsBeforeRefresh)方法成立。
当优先级队列得到元素增删音讯时,它会停顿处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前移动的队列的很小数量。
首要安装你的程序可以容忍的不排序的细小值。

放入队列时索要钦点优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

偶尔你只怕会有错觉,优先级设置并从未起效。那是因为事先级是对此队列积压的要素而言,如若消费速度过快有大概出现在后多个成分入队操作以前前多个因素已经被消费,那种地方下DistributedPriorityQueue会退化为DistributedQueue。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不领悟你是还是不是熟习。
DistributedDelayQueue也提供了就如的作用, 成分有个delay值,
消费者隔一段时间才能吸纳成分。 涉及到下边三个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

由此下边的言语成立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时方可内定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离以往的二个年华距离,
比如20飞秒,而是未来的三个小时戳,如 System.currentTimeMillis() + 10秒。
如若delayUntilEpoch的时日已经身故,音信会及时被消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前方即使达成了各类队列,可是你注意到没有,这几个队列并不曾落到实处类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(不过没有达成Queue接口)。
创立很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

日增成分:

public boolean offer(byte[] data) throws Exception

剔除成分:

public byte[] take() throws Exception

别的还提供了其他措施:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take措施在成功重返此前会被封堵。
poll方法在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

可是实际发送了100条新闻,消费完第②条之后,前边的音讯无法消费,近日没找到原因。查看一下官方文档推荐的demo使用上面多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过实际应用发现如故存在消费阻塞问题。

分布式屏障—Barrier

分布式Barrier是那般一个类:
它会阻塞全体节点上的等候进程,直到某五个被满意,
然后具备的节点继续开展。

例如赛马比赛中, 等赛马陆续赶到起跑线前。
一声令下,全部的跑马都飞奔而出。

DistributedBarrier

DistributedBarrier类完结了栅栏的效能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需求设置栅栏,它将封堵在它上面等待的线程:

setBarrier();

下一场需求阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满足时,移除栅栏,全数等待的线程将继续执行:

removeBarrier();

老大处理 Distributed巴里r
会监控连接景况,当连接断掉时waitOnBarrier()方法会抛出尤其。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

这些事例创制了controlBarrier来设置栅栏和移除栅栏。
大家创制了五个线程,在此Barrier上等待。
最终移除栅栏后有着的线程才继续执行。

只要你从头不设置栅栏,全数的线程就不会阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的上马和了结时一起。当充裕的经过进入到双栅栏时,进度开始盘算,
当计算达成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()方法被调用时,成员被打断,直到全部的分子都调用了enter()
leave()艺术被调用时,它也打断调用线程,直到全数的积极分子都调用了leave()
就好像百米赛跑比赛, 发令枪响,
全部的运动员起始跑,等具有的健儿跑过巅峰线,竞赛才截止。

DistributedDoubleBarrier会监控连接处境,当连接断掉时enter()leave()方法会抛出很是。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPE奥迪Q7分布式一致性原理与执行》
《 跟着实例学习ZooKeeper的用法》博客体系

品类仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,比较方便导航到每一个章节,只是简书不扶助,本文的MD原文放在项目标/resources/md目录下,有爱自取,小说用Typora编写,指出用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
本身是throwable,在马尼拉努力,白天上班,晌午和双休不定时加班,晚上空闲锲而不舍写下博客。
目的在于作者的篇章可以给你带来收获,共勉。

发表评论

电子邮件地址不会被公开。 必填项已用*标注