JAWS-UG Advent Calendar 7日目です。
昨日は高校生でも使えるAWSでした。
今日はみんなが大好きで仕方ないAmazon SimpleWorkflowService(以下SWF)のWorkerとDeciderをAWS Elastic Beanstalk(以下Beanstalk)で動かす方法についてです。
SWFといえば、分散処理をするために非常に便利なもので、Amazon SQSに比べて正確な制御がし易いという特徴を持っています。
しかしSWFでもめんどくさいところがあります。
それは新しいバージョンのデプロイの作業です。
SWFでは各ActivityとWorkflowにバージョンを持っていて、ExternalClientによって呼び出されるバージョンと、Workflowのバージョンが合っていないと実行されなかったり、当然WorkflowからActivityを呼び出すときもClientのバージョンと実装のバージョンが合っていなくても実行されません。
この管理は意外に手間で、キチンと管理してあげないとサービスが全て止まってしまうなんて事もありえます。
そこで、この管理の手間を簡単に、かつ完璧にこなすためにElastic Beanstalkを使ってみようと思います。
Beanstalkを使うことで、SWFのアプリケーションのバージョンを上げるときも、新しいEnvironmentを作りそちらにデプロイすることで、新旧両方のアプリケーションを同時稼働させる事が可能です。
そして旧バージョンでのExecutionがなくなったタイミングで古いEnvironmentをTerminateするだけで良いのです。
SWFのアプリケーションはBeanstalkで動かすのが楽ちんでよいです。
というわけで、BeanstalkでSWFのアプリケーションを動かすための方法を紹介しますが、
BeanstalkでSWFのアプリケーションを動かす方法は、いくつか有ります。
jsvcを使う方法。
Tomcatの中でThreadとして動かしてしまう方法
色々なやり方があるとは思いますが、今回は.ebextensionsとかもあまりいじらなくても良い
なんか無駄遣いしているような気がしますが、、、Tomcatの中で起動してしまう方法でやってみます。
ActivityとDeciderを動かすものを普通に作る。
SWFで普段動かしているのと同じような感じで作れば大丈夫です。
私はThreadを継承したAbstractなものを一つ作ってDeciderとActivityとそれぞれが継承してやるのが好きなので、こんな感じで作っています。
package com.tottokug.host; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient; import com.amazonaws.services.simpleworkflow.flow.WorkerBase; import com.tottokug.common.Config; public abstract class AbstractHost extends Thread { static Logger logger = LoggerFactory.getLogger(AbstractHost.class); AmazonSimpleWorkflow swfService; WorkerBase worker; boolean test; boolean halt_; static final String TEST_TASKLIST = "JUNIT"; String taskList = null; public AbstractHost() { this.test = false; this.init(); } public AbstractHost(boolean test) { this.test = test; this.init(); } protected void init() { this.swfService = new AmazonSimpleWorkflowClient(new ClasspathPropertiesFileCredentialsProvider()); this.activeTaskList = new ArrayList<String>(); this.swfService.setRegion(Config.getRegion()); this.halt_ = false; if (this.test) { this.taskList = TEST_TASKLIST; } } abstract protected WorkerBase getWorker(); public void halt() { this.halt_ = true; interrupt(); } public abstract void destroy(); protected abstract List<String> getActiveTaskLists(); List<String> activeTaskList; @Override public void run() { logger.info(this.getClass().getName() + " starting"); this.worker = getWorker(); this.worker.start(); while (true) { try { sleep(1000); } catch (InterruptedException e) { } if (this.halt_()) { break; } } } protected void updateActiveTasklist() { } static enum TaskListStatus { STOP, START, STAY } protected Map<TaskListStatus, String> tasklistCompare(List<String> newer, List<String> older) { Map<TaskListStatus, String> tasklistStatuses = new HashMap<AbstractHost.TaskListStatus, String>(); return tasklistStatuses; } protected boolean halt_() { if (this.halt_) { logger.info("halt signal received"); logger.info("shutting down worker thread. wating 1000 ms ..."); try { this.worker.shutdownAndAwaitTermination(30000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { this.worker.shutdown(); } logger.info("shutting down host"); return true; } return false; } }
JUnitでテストする時もJUnitの中でこのHostを立ち上げるので、それ用にタスクリストを分けていたりします。
package com.tottokug.host; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient; import com.amazonaws.services.simpleworkflow.flow.WorkerBase; import com.tottokug.common.Config; public abstract class AbstractHost extends Thread { static Logger logger = LoggerFactory.getLogger(AbstractHost.class); AmazonSimpleWorkflow swfService; WorkerBase worker; boolean test; boolean halt_; static final String TEST_TASKLIST = "JUNIT"; String taskList = null; public AbstractHost() { this.test = false; this.init(); } public AbstractHost(boolean test) { this.test = test; this.init(); } protected void init() { this.swfService = new AmazonSimpleWorkflowClient(new ClasspathPropertiesFileCredentialsProvider()); this.activeTaskList = new ArrayList<String>(); this.swfService.setRegion(Config.getRegion()); this.halt_ = false; if (this.test) { this.taskList = TEST_TASKLIST; } } abstract protected WorkerBase getWorker(); public void halt() { this.halt_ = true; interrupt(); } public abstract void destroy(); protected abstract List<String> getActiveTaskLists(); List<String> activeTaskList; @Override public void run() { logger.info(this.getClass().getName() + " starting"); this.worker = getWorker(); this.worker.start(); while (true) { try { sleep(1000); } catch (InterruptedException e) { } if (this.halt_()) { break; } } } protected void updateActiveTasklist() { } static enum TaskListStatus { STOP, START, STAY } protected Map<TaskListStatus, String> tasklistCompare(List<String> newer, List<String> older) { Map<TaskListStatus, String> tasklistStatuses = new HashMap<AbstractHost.TaskListStatus, String>(); return tasklistStatuses; } protected boolean halt_() { if (this.halt_) { logger.info("halt signal received"); logger.info("shutting down worker thread. wating 30000 ms ..."); try { this.worker.shutdownAndAwaitTermination(30000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { this.worker.shutdown(); } logger.info("shutting down host"); return true; } return false; } }
こんな感じで出来たら後は簡単で、Servletのロード時にこれがスレッドで立ち上がれば良いのです。
Servlet作る
次に適当にServlet作ります。
Servletのinitメソッドで無理やりThreadを立ち上げてみます。
public class WorkerStarter extends HttpServlet { // 略 ActivityHost a = new ActivityHost(); DeciderHost d = new DeciderHost(); public void init(ServletConfig config) throws ServletException { a.start(); d.start(); } public void destroy() { a.interrupt(); d.interrupt(); } // 略 }
後はこのServletがTomcat起動時に自動的に読み込まれるようにすればOKです。
load-on-startupを書いておくことで、アクセスがなくてもロードされます。
<servlet> <description></description> <display-name>WorkerStarter</display-name> <servlet-name>WorkerStarter</servlet-name> <servlet-class>com.tottokug.WorkerStarter</servlet-class> <load-on-startup>3</load-on-startup> </servlet>
デプロイ
必要なライブラリ類もWEB-INF/libに入れて、設定ファイル、log系の設定なんかも詰め込んでwarにして、
Elastic Beanstalkにデプロイすれば、ElasticBeanstalkでSWFのアプリケーションが動き始めます。
ログの出し方だけ、気をつけてあげましょう。catalina.outに出してしまうのが楽かと思います。
さいごに
試してはいませんが、全く同じ方法でKinesis Applicationも動かせるはずです。
後は今のままではAutoScalingがまともに動かないはずなので、別のAutoScalingに差し替えませう。
SWFのアプリケーションを運用する上で手間である新旧並行稼動、旧バージョンの終了等がElastic Beanstalkで動かす事によってすごくやりやすくなります。
まさにBeanstalkのEnvironmentとVersion管理機能はSWFにぴったりの機能です。
今SWFを使っている人はBeanstalkでの管理をして、
今Beanstalkを使っている人はWebアプリケーションだけでなくSWFのWorkerたちも載せてみると面白いと思います。
今度もし、機会があったら、無理やりTomcat上で動かすではなく、
jsvcで動かす方法を.ebextensionと一緒に紹介するかもしれません。