Gossip 事始め
株式会社ムロドー
とみたかずたか
アジェンダ
▫ Gossip 事始め
コードリーディングの準備
デーモン起動
起動準備
デーモン初期化
Gossip 開始
コードリーディング下準備
• Eclpse で step by step 実行を行う。
用意するもの
• 面倒くさいので pleiades on Glileo を用意
• Apache Cassandra 0.6.3
• Cassandra クラスター
Eclpse のセッティング
• step by step 実行の設定で結構手間取った。
Ecplse を適当なディレクトリに展開し起動を行う。
Eclpse のセッティング
Ecplse で Cassandra のソースを読み込みプロジェクトを
作成。
Cassandra のソ
ースディレクト
リを直接指定。
直接 build を行う為、要
注意。 Bin ディレクトリ
を吹っ飛ばしたり別途ソ
ースを使いまわすことが
不可能になります。
Eclpse のセッティング
Cassandra の Build 。
Build.xml を右クリック
し実行タブから外部ツー
ルの構成を選択。
Target から build と jar を
選択
Eclpse のセッティング
Cassandra の実行構成の設定 Eclipse 上部から実行→
実行の構成を選択し構成
の作成、管理画面を表示
メインクラスに
「 org.apache.cassandra.thrift.CassandraDaemon 」を指定
Eclpse のセッティング
Cassandra の実行構成の設定
引き続き引数タブを選択
引数に -Dcom.sun.management.jmxremote.port=8080
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
-Dcassandra
-Dstorage-config="C:\Users\kazutaka\Downloads\apache-cassandra-0.6.3-src\conf"
-Dcassandra-foreground=yes
を設定(少なくとも「 Dstorage-config 」は必要なようです。
Eclpse のセッティング
Cassandra の Step By Step 実行
Debug のパースペクティブを開きデバック実行
デーモン起動
public static void main(String[] args)
{
CassandraDaemon daemon = new CassandraDaemon();
String pidFile = System.getProperty("cassandra-
pidfile");
try
{
daemon.setup();
Main 関数直後に setup メソッドがあります。
203
204
205
206
207
208
209
210
211
212
デーモン起動
public static void main(String[] args)
{
CassandraDaemon daemon = new CassandraDaemon();
String pidFile = System.getProperty("cassandra-
pidfile");
try
{
daemon.setup();
Main 直後に setup メソッドがあります。
203
204
205
206
207
208
209
210
211
212
デーモン起動
63 private void setup() throws IOException, TTransportException
64 {
~~
~~
69 int listenPort = DatabaseDescriptor.getThriftPort();
70 InetAddress listenAddr = DatabaseDescriptor.getThriftAddress();
~~
~~
92 try
93 {
94 SystemTable.checkHealth();
95 }
~~
~~
111 CommitLog.recover();
112 CompactionManager.instance.checkAllColumnFamilies();
~~
~~
114 // start server internals
115 StorageService.instance.initServer();
Setup 内サーバー初期化まで。
ローカルの IP
ポートを取得 と
システムテーブ
ルのチェック
Commit ログの読み込
みと内容のチェック、
リカバリーなど
初期化開始
ここまで org.apache.cassandra.thrift.CassandraDaemon
デーモン起動
63 private void setup() throws IOException, TTransportException
64 {
~~
~~
69 int listenPort = DatabaseDescriptor.getThriftPort();
70 InetAddress listenAddr = DatabaseDescriptor.getThriftAddress();
~~
~~
92 try
93 {
94 SystemTable.checkHealth();
95 }
~~
~~
111 CommitLog.recover();
112 CompactionManager.instance.checkAllColumnFamilies();
~~
~~
114 // start server internals
115 StorageService.instance.initServer();
Setup 内サーバー初期化まで。
ローカルの IP
ポートを取得 と
システムテーブ
ルのチェック
Commit ログの読み込
みと内容のチェック、
リカバリーなど
初期化開始
ここまで org.apache.cassandra.thrift.CassandraDaemon
相互認知
• org.apache.cassandra.service.StorageService
initServer
304 initialized = true; 305 isClientMode = false;
306 storageMetadata_ = SystemTable.initMetadata(); 307
308 // be certain that the recorded clustername matches what the user specified 309 if (!
(Arrays.equals(storageMetadata_.getClusterName(),DatabaseDescriptor.getClusterName().getBytes()))) 310 {
311 logger_.error("ClusterName mismatch: " + new String(storageMetadata_.getClusterName()) + " != " +
312 DatabaseDescriptor.getClusterName()); 313 System.exit(3);
314 } 315
316 DatabaseDescriptor.createAllDirectories();
最初に自分自身の初期化を行う。
クラスターネーム、キースペース、トークンなどメタ情報取得など
ゴシップ開始
• org.apache.cassandra.service.StorageService
327 logger_.info("Starting up server gossip"); 328
329 // have to start the gossip service before we can see any info on other nodes. this is necessary
330 // for bootstrap to get the load info it needs.
331 // (we won't be part of the storage ring though until we add a nodeId to our state, below.) 332 Gossiper.instance.register(this);
333 Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration()); // needed for node-ring gathering.
「 start 」の最初にローカルのアドレスを取得。
( FBUtilities.getLocalAddress )
私たちが、他のノードの情報を見ることができる前にゴシップサービスは始め
なければなりません。
これは、「 bootstrap 」が必要とする負荷の情報を得る為に必要です。
もっとも、私たちは「 nodeid 」を私たちの state に追加するまでストレージ
・リングの一部でなくなるでしょう。
ゴシップ開始 2
• org.apache.cassandra.gms.Gossiper
838 public void start(InetAddress localEndPoint, int generationNbr) 839 {
840 localEndPoint_ = localEndPoint;
841 /* Get the seeds from the config and initialize them. */ 842 Set<InetAddress> seedHosts = DatabaseDescriptor.getSeeds(); 843 for (InetAddress seed : seedHosts)
844 {
845 if (seed.equals(localEndPoint)) 846 continue;
847 seeds_.add(seed); 848 }
Seeds より自分以外の IP を取得
ゴシップ開始 3
• org.apache.cassandra.gms.Gossiper
351 /* initialize the heartbeat state for this localEndPoint */
352 EndPointState localState = endPointStateMap_.get(localEndPoint_); 353 if ( localState == null )
354 {
355 HeartBeatState hbState = new HeartBeatState(generationNbr); 356 localState = new EndPointState(hbState);
357 localState.isAlive(true); 358 localState.isAGossiper(true);
359 endPointStateMap_.put(localEndPoint_, localState); 360 }
自分のハートビートステータスを取得
ゴシップ開始 4
• org.apache.cassandra.gms.Gossiper
361 /* starts a timer thread */
362 gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
GossipTimerTask 起動!
105 public final static int intervalInMillis_ = 1000;
というところで 1000 ミリ秒間隔でタイマー起動
GossipTimerTask
• org.apache.cassandra.gms.Gossiper
private class GossipTimerTask extends TimerTask {
public void run() {
try {
MessagingService.instance.waitUntilListening(); synchronized( Gossiper.instance )
{
endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat(); List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
Gossiper.instance.makeRandomGossipDigest(gDigests); if ( gDigests.size() > 0 )
{
Message message = makeGossipDigestSynMessage(gDigests); boolean gossipedToSeed = doGossipToLiveMember(message); doGossipToUnreachableMember(message);
if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) doGossipToSeed(message);
if (logger_.isTraceEnabled())
logger_.trace("Performing status check ..."); doStatusCheck();
} } }
catch (Exception e) {
throw new RuntimeException(e); }
} }