読者です 読者をやめる 読者になる 読者になる

かれ4

かれこれ4個目のブログ

AWS Elastic Beanstalkで Amazon SWFのWorker Deciderを動かす #jawsug

AWS SWF ElasticBeanstalk

JAWS-UG Advent Calendar 7日目です。

昨日は高校生でも使えるAWSでした。

今日はみんなが大好きで仕方ないAmazon SimpleWorkflowService(以下SWF)のWorkerとDeciderをAWS Elastic Beanstalk(以下Beanstalk)で動かす方法についてです。

SWFといえば、分散処理をするために非常に便利なもので、Amazon SQSに比べて正確な制御がし易いという特徴を持っています。
しかしSWFでもめんどくさいところがあります。

それは新しいバージョンのデプロイの作業です。

SWFでは各ActivityとWorkflowにバージョンを持っていて、ExternalClientによって呼び出されるバージョンと、Workflowのバージョンが合っていないと実行されなかったり、当然WorkflowからActivityを呼び出すときもClientのバージョンと実装のバージョンが合っていなくても実行されません。


この管理は意外に手間で、キチンと管理してあげないとサービスが全て止まってしまうなんて事もありえます。
そこで、この管理の手間を簡単に、かつ完璧にこなすためにElastic Beanstalkを使ってみようと思います。

Beanstalkを使うことで、SWFのアプリケーションのバージョンを上げるときも、新しいEnvironmentを作りそちらにデプロイすることで、新旧両方のアプリケーションを同時稼働させる事が可能です。

そして旧バージョンでのExecutionがなくなったタイミングで古いEnvironmentをTerminateするだけで良いのです。

SWFのアプリケーションはBeanstalkで動かすのが楽ちんでよいです。


というわけで、BeanstalkでSWFのアプリケーションを動かすための方法を紹介しますが、
BeanstalkでSWFのアプリケーションを動かす方法は、いくつか有ります。

jsvcを使う方法。
Tomcatの中でThreadとして動かしてしまう方法
色々なやり方があるとは思いますが、今回は.ebextensionsとかもあまりいじらなくても良い
なんか無駄遣いしているような気がしますが、、、Tomcatの中で起動してしまう方法でやってみます。

ActivityとDeciderを動かすものを普通に作る。

SWFで普段動かしているのと同じような感じで作れば大丈夫です。
私はThreadを継承したAbstractなものを一つ作ってDeciderとActivityとそれぞれが継承してやるのが好きなので、こんな感じで作っています。

package com.tottokug.host;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
import com.amazonaws.services.simpleworkflow.flow.WorkerBase;
import com.tottokug.common.Config;

public abstract class AbstractHost extends Thread {
  static Logger logger = LoggerFactory.getLogger(AbstractHost.class);
  AmazonSimpleWorkflow swfService;
  WorkerBase worker;
  boolean test;
  boolean halt_;
  static final String TEST_TASKLIST = "JUNIT";

  String taskList = null;

  public AbstractHost() {
    this.test = false;
    this.init();
  }

  public AbstractHost(boolean test) {
    this.test = test;
    this.init();
  }

  protected void init() {
    this.swfService = new AmazonSimpleWorkflowClient(new ClasspathPropertiesFileCredentialsProvider());
    this.activeTaskList = new ArrayList<String>();
    this.swfService.setRegion(Config.getRegion());
    this.halt_ = false;
    if (this.test) {
      this.taskList = TEST_TASKLIST;
    }
  }

  abstract protected WorkerBase getWorker();

  public void halt() {
    this.halt_ = true;
    interrupt();
  }

  public abstract void destroy();

  protected abstract List<String> getActiveTaskLists();

  List<String> activeTaskList;

  @Override
  public void run() {
    logger.info(this.getClass().getName() + " starting");
    this.worker = getWorker();
    this.worker.start();
    while (true) {
      try {
        sleep(1000);
      } catch (InterruptedException e) {
      }
      if (this.halt_()) {
        break;
      }
    }
  }

  protected void updateActiveTasklist() {

  }

  static enum TaskListStatus {
    STOP, START, STAY
  }

  protected Map<TaskListStatus, String> tasklistCompare(List<String> newer, List<String> older) {
    Map<TaskListStatus, String> tasklistStatuses = new HashMap<AbstractHost.TaskListStatus, String>();
    return tasklistStatuses;
  }

  protected boolean halt_() {
    if (this.halt_) {
      logger.info("halt signal received");
      logger.info("shutting down worker thread. wating 1000 ms ...");
      try {
        this.worker.shutdownAndAwaitTermination(30000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        this.worker.shutdown();
      }
      logger.info("shutting down host");
      return true;
    }
    return false;
  }
}


JUnitでテストする時もJUnitの中でこのHostを立ち上げるので、それ用にタスクリストを分けていたりします。

package com.tottokug.host;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
import com.amazonaws.services.simpleworkflow.flow.WorkerBase;
import com.tottokug.common.Config;

public abstract class AbstractHost extends Thread {
  static Logger logger = LoggerFactory.getLogger(AbstractHost.class);
  AmazonSimpleWorkflow swfService;
  WorkerBase worker;
  boolean test;
  boolean halt_;
  static final String TEST_TASKLIST = "JUNIT";

  String taskList = null;

  public AbstractHost() {
    this.test = false;
    this.init();
  }

  public AbstractHost(boolean test) {
    this.test = test;
    this.init();
  }

  protected void init() {
    this.swfService = new AmazonSimpleWorkflowClient(new ClasspathPropertiesFileCredentialsProvider());
    this.activeTaskList = new ArrayList<String>();
    this.swfService.setRegion(Config.getRegion());
    this.halt_ = false;
    if (this.test) {
      this.taskList = TEST_TASKLIST;
    }
  }

  abstract protected WorkerBase getWorker();

  public void halt() {
    this.halt_ = true;
    interrupt();
  }

  public abstract void destroy();

  protected abstract List<String> getActiveTaskLists();

  List<String> activeTaskList;

  @Override
  public void run() {
    logger.info(this.getClass().getName() + " starting");
    this.worker = getWorker();
    this.worker.start();
    while (true) {
      try {
        sleep(1000);
      } catch (InterruptedException e) {
      }
      if (this.halt_()) {
        break;
      }
    }
  }

  protected void updateActiveTasklist() {

  }

  static enum TaskListStatus {
    STOP, START, STAY
  }

  protected Map<TaskListStatus, String> tasklistCompare(List<String> newer, List<String> older) {
    Map<TaskListStatus, String> tasklistStatuses = new HashMap<AbstractHost.TaskListStatus, String>();
    return tasklistStatuses;
  }

  protected boolean halt_() {
    if (this.halt_) {
      logger.info("halt signal received");
      logger.info("shutting down worker thread. wating 30000 ms ...");
      try {
        this.worker.shutdownAndAwaitTermination(30000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        this.worker.shutdown();
      }
      logger.info("shutting down host");
      return true;
    }
    return false;
  }

}

こんな感じで出来たら後は簡単で、Servletのロード時にこれがスレッドで立ち上がれば良いのです。

Servlet作る

次に適当にServlet作ります。
Servletのinitメソッドで無理やりThreadを立ち上げてみます。

public class WorkerStarter extends HttpServlet {
 // 略
 ActivityHost a = new ActivityHost();
 DeciderHost d = new DeciderHost();
   
 public void init(ServletConfig config) throws ServletException {
    a.start();
    d.start();
  }
  
  public void destroy() {
    a.interrupt();
    d.interrupt();
  }
  // 略  
}

後はこのServletTomcat起動時に自動的に読み込まれるようにすればOKです。
load-on-startupを書いておくことで、アクセスがなくてもロードされます。

<servlet>
	<description></description>
	<display-name>WorkerStarter</display-name>
	<servlet-name>WorkerStarter</servlet-name>
	<servlet-class>com.tottokug.WorkerStarter</servlet-class>
	<load-on-startup>3</load-on-startup>
</servlet>

デプロイ

必要なライブラリ類もWEB-INF/libに入れて、設定ファイル、log系の設定なんかも詰め込んでwarにして、
Elastic Beanstalkにデプロイすれば、ElasticBeanstalkでSWFのアプリケーションが動き始めます。
ログの出し方だけ、気をつけてあげましょう。catalina.outに出してしまうのが楽かと思います。


さいごに

試してはいませんが、全く同じ方法でKinesis Applicationも動かせるはずです。
後は今のままではAutoScalingがまともに動かないはずなので、別のAutoScalingに差し替えませう。

SWFのアプリケーションを運用する上で手間である新旧並行稼動、旧バージョンの終了等がElastic Beanstalkで動かす事によってすごくやりやすくなります。
まさにBeanstalkのEnvironmentとVersion管理機能はSWFにぴったりの機能です。

今SWFを使っている人はBeanstalkでの管理をして、
今Beanstalkを使っている人はWebアプリケーションだけでなくSWFのWorkerたちも載せてみると面白いと思います。

今度もし、機会があったら、無理やりTomcat上で動かすではなく、
jsvcで動かす方法を.ebextensionと一緒に紹介するかもしれません。