Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
R
Registration and Discovery Zookeeper
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
mohamadbashar.disoki
Registration and Discovery Zookeeper
Commits
fc503baf
Commit
fc503baf
authored
Jan 08, 2024
by
Mohamad Bashar Desoki
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Cluster Management, Registration and Discovery
parents
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
379 additions
and
0 deletions
+379
-0
.gitignore
.gitignore
+39
-0
README.md
README.md
+8
-0
pom.xml
pom.xml
+37
-0
Application.java
src/main/java/Application.java
+68
-0
LeaderElection.java
src/main/java/LeaderElection.java
+73
-0
OnElectionAction.java
src/main/java/OnElectionAction.java
+39
-0
OnElectionCallback.java
src/main/java/OnElectionCallback.java
+6
-0
ServiceRegistry.java
src/main/java/ServiceRegistry.java
+95
-0
logback.xml
src/main/resources/logback.xml
+14
-0
No files found.
.gitignore
0 → 100644
View file @
fc503baf
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
README.md
0 → 100644
View file @
fc503baf
# Cluster Management, Registration and Discovery
*topics*
*
[
Logback vs SLF4J vs Log4J2 - what is the difference?
](
https://www.youtube.com/watch?v=SWHYrCXIL38&ab_channel=JavaBrains
)
*
Introduction to Service Registry & Service Discovery
*
Implement Service Registry with Zookeeper
*
Integrate into our leader-worker architecture
\ No newline at end of file
pom.xml
0 → 100644
View file @
fc503baf
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
org.example
</groupId>
<artifactId>
Service-Registration-and-Discovery
</artifactId>
<version>
1.0-SNAPSHOT
</version>
<properties>
<maven.compiler.source>
17
</maven.compiler.source>
<maven.compiler.target>
17
</maven.compiler.target>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.11.0
</version>
<configuration>
<source>
17
</source>
<target>
17
</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>
org.apache.zookeeper
</groupId>
<artifactId>
zookeeper
</artifactId>
<version>
3.9.1
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
src/main/java/Application.java
0 → 100644
View file @
fc503baf
import
org.apache.zookeeper.KeeperException
;
import
org.apache.zookeeper.WatchedEvent
;
import
org.apache.zookeeper.Watcher
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
public
class
Application
implements
Watcher
{
private
static
final
String
address
=
"172.29.3.101:2181"
;
private
static
final
int
SESSION_TIMEOUT
=
3000
;
//dead client
private
static
final
int
DEFAULT_PORT
=
8080
;
private
ZooKeeper
zooKeeper
;
public
static
void
main
(
String
[]
args
)
throws
IOException
,
InterruptedException
,
KeeperException
{
int
currentServerPort
=
args
.
length
==
1
?
Integer
.
parseInt
(
args
[
0
])
:
DEFAULT_PORT
;
Application
application
=
new
Application
();
ZooKeeper
zooKeeper
=
application
.
connectToZookeeper
();
ServiceRegistry
serviceRegistry
=
new
ServiceRegistry
(
zooKeeper
);
OnElectionAction
onElectionAction
=
new
OnElectionAction
(
serviceRegistry
,
currentServerPort
);
LeaderElection
leaderElection
=
new
LeaderElection
(
zooKeeper
,
onElectionAction
);
leaderElection
.
volunteerForLeadership
();
leaderElection
.
reelectLeader
();
application
.
run
();
application
.
close
();
}
public
ZooKeeper
connectToZookeeper
()
throws
IOException
{
this
.
zooKeeper
=
new
ZooKeeper
(
address
,
SESSION_TIMEOUT
,
this
);
return
zooKeeper
;
}
public
void
run
()
throws
InterruptedException
{
synchronized
(
zooKeeper
)
{
zooKeeper
.
wait
();
}
}
private
void
close
()
throws
InterruptedException
{
this
.
zooKeeper
.
close
();
}
@Override
public
void
process
(
WatchedEvent
watchedEvent
)
{
switch
(
watchedEvent
.
getType
())
{
case
None:
if
(
watchedEvent
.
getState
()
==
Event
.
KeeperState
.
SyncConnected
)
{
System
.
out
.
println
(
"Successfully connected to Zookeeper"
);
}
else
if
(
watchedEvent
.
getState
()
==
Event
.
KeeperState
.
Disconnected
)
{
synchronized
(
zooKeeper
)
{
System
.
out
.
println
(
"Disconnected from Zookeeper"
);
zooKeeper
.
notifyAll
();
}
}
else
if
(
watchedEvent
.
getState
()
==
Event
.
KeeperState
.
Closed
)
{
System
.
out
.
println
(
"Closed Successfully"
);
}
break
;
}
}
}
src/main/java/LeaderElection.java
0 → 100644
View file @
fc503baf
import
org.apache.zookeeper.*
;
import
org.apache.zookeeper.data.Stat
;
import
java.util.Collections
;
import
java.util.List
;
public
class
LeaderElection
implements
Watcher
{
private
static
final
String
ELECTION_NAMESPACE
=
"/election"
;
private
String
currentZnodeName
;
private
ZooKeeper
zooKeeper
;
private
OnElectionCallback
onElectionCallback
;
public
LeaderElection
(
ZooKeeper
zooKeeper
,
OnElectionCallback
onElectionCallback
)
{
this
.
zooKeeper
=
zooKeeper
;
this
.
onElectionCallback
=
onElectionCallback
;
}
public
void
volunteerForLeadership
()
throws
InterruptedException
,
KeeperException
{
String
znodePrefix
=
ELECTION_NAMESPACE
+
"/c_"
;
String
znodeFullPath
=
zooKeeper
.
create
(
znodePrefix
,
new
byte
[]{},
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
EPHEMERAL_SEQUENTIAL
);
System
.
out
.
println
(
znodeFullPath
);
this
.
currentZnodeName
=
znodeFullPath
.
replace
(
ELECTION_NAMESPACE
+
"/"
,
""
);
}
public
void
reelectLeader
()
throws
InterruptedException
,
KeeperException
{
String
predecessorName
=
""
;
Stat
predecessorStat
=
null
;
//this while to guarantee get predecessor even if it deleted just before zookeeper.exist
while
(
predecessorStat
==
null
)
{
List
<
String
>
children
=
zooKeeper
.
getChildren
(
ELECTION_NAMESPACE
,
false
);
Collections
.
sort
(
children
);
String
smallestChild
=
children
.
get
(
0
);
//the first element
if
(
smallestChild
.
equals
(
currentZnodeName
))
{
System
.
out
.
println
(
"I'm a leader"
);
onElectionCallback
.
onElectedToBeLeader
();
return
;
}
else
{
System
.
out
.
println
(
"I'm not a leader"
);
int
predecessorIndex
=
children
.
indexOf
(
currentZnodeName
)
-
1
;
predecessorName
=
children
.
get
(
predecessorIndex
);
predecessorStat
=
zooKeeper
.
exists
(
ELECTION_NAMESPACE
+
"/"
+
predecessorName
,
this
);
}
}
onElectionCallback
.
onWorker
();
System
.
out
.
println
(
"Watching znode "
+
predecessorName
);
System
.
out
.
println
();
}
@Override
public
void
process
(
WatchedEvent
watchedEvent
)
{
switch
(
watchedEvent
.
getType
())
{
case
NodeDeleted:
try
{
reelectLeader
();
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
KeeperException
e
)
{
throw
new
RuntimeException
(
e
);
}
break
;
}
}
}
src/main/java/OnElectionAction.java
0 → 100644
View file @
fc503baf
import
org.apache.zookeeper.KeeperException
;
import
java.net.InetAddress
;
import
java.net.UnknownHostException
;
public
class
OnElectionAction
implements
OnElectionCallback
{
private
final
ServiceRegistry
serviceRegistry
;
private
final
int
port
;
public
OnElectionAction
(
ServiceRegistry
serviceRegistry
,
int
port
)
{
this
.
serviceRegistry
=
serviceRegistry
;
this
.
port
=
port
;
}
@Override
public
void
onElectedToBeLeader
()
{
serviceRegistry
.
unregisterFromCluster
();
serviceRegistry
.
registerForUpdates
();
}
@Override
public
void
onWorker
()
{
try
{
String
currentServerAddress
=
String
.
format
(
"http://%s:%d"
,
InetAddress
.
getLocalHost
().
getCanonicalHostName
(),
port
);
serviceRegistry
.
registerToCluster
(
currentServerAddress
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
catch
(
UnknownHostException
e
)
{
e
.
printStackTrace
();
}
catch
(
KeeperException
e
)
{
e
.
printStackTrace
();
}
}
}
src/main/java/OnElectionCallback.java
0 → 100644
View file @
fc503baf
public
interface
OnElectionCallback
{
void
onElectedToBeLeader
();
void
onWorker
();
}
src/main/java/ServiceRegistry.java
0 → 100644
View file @
fc503baf
import
org.apache.zookeeper.*
;
import
org.apache.zookeeper.data.Stat
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
public
class
ServiceRegistry
implements
Watcher
{
private
static
final
String
REGISTRY_ZNODE
=
"/service_registry"
;
private
final
ZooKeeper
zooKeeper
;
private
String
currentZnode
=
null
;
private
List
<
String
>
allServiceAddresses
=
null
;
public
ServiceRegistry
(
ZooKeeper
zooKeeper
)
{
this
.
zooKeeper
=
zooKeeper
;
createServiceRegistryZnode
();
}
private
void
createServiceRegistryZnode
()
{
try
{
if
(
zooKeeper
.
exists
(
REGISTRY_ZNODE
,
false
)
==
null
)
{
zooKeeper
.
create
(
REGISTRY_ZNODE
,
new
byte
[]{},
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
}
}
catch
(
KeeperException
e
)
{
e
.
printStackTrace
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
public
void
registerToCluster
(
String
metadata
)
throws
KeeperException
,
InterruptedException
{
if
(
this
.
currentZnode
!=
null
)
{
System
.
out
.
println
(
"Already registered to service registry"
);
return
;
}
this
.
currentZnode
=
zooKeeper
.
create
(
REGISTRY_ZNODE
+
"/n_"
,
metadata
.
getBytes
(),
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
EPHEMERAL_SEQUENTIAL
);
System
.
out
.
println
(
"Registered to service registry"
);
}
public
void
registerForUpdates
()
{
try
{
updateAddresses
();
}
catch
(
KeeperException
e
)
{
e
.
printStackTrace
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
public
void
unregisterFromCluster
()
{
try
{
if
(
currentZnode
!=
null
&&
zooKeeper
.
exists
(
currentZnode
,
false
)
!=
null
)
{
zooKeeper
.
delete
(
currentZnode
,
-
1
);
}
}
catch
(
KeeperException
e
)
{
e
.
printStackTrace
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
private
synchronized
void
updateAddresses
()
throws
KeeperException
,
InterruptedException
{
List
<
String
>
workerZnodes
=
zooKeeper
.
getChildren
(
REGISTRY_ZNODE
,
this
);
List
<
String
>
addresses
=
new
ArrayList
<>(
workerZnodes
.
size
());
for
(
String
workerZnode
:
workerZnodes
)
{
String
workerFullPath
=
REGISTRY_ZNODE
+
"/"
+
workerZnode
;
Stat
stat
=
zooKeeper
.
exists
(
workerFullPath
,
false
);
if
(
stat
==
null
)
{
continue
;
}
byte
[]
addressBytes
=
zooKeeper
.
getData
(
workerFullPath
,
false
,
stat
);
String
address
=
new
String
(
addressBytes
);
addresses
.
add
(
address
);
}
this
.
allServiceAddresses
=
Collections
.
unmodifiableList
(
addresses
);
System
.
out
.
println
(
"The cluster addresses are: "
+
this
.
allServiceAddresses
);
}
@Override
public
void
process
(
WatchedEvent
watchedEvent
)
{
try
{
updateAddresses
();
}
catch
(
KeeperException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
}
}
src/main/resources/logback.xml
0 → 100644
View file @
fc503baf
<?xml version="1.0" encoding="UTF-8"?>
<Configuration
status=
"WARN"
>
<Appenders>
<Console
name=
"Console"
target=
"SYSTEM_OUT"
>
<PatternLayout
pattern=
"%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
/>
</Console>
</Appenders>
<Loggers>
<Root
level=
"WARN"
>
<AppenderRef
ref=
"Console"
/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment