くらしのマーケット開発ブログ

「くらしのマーケット」を運営する、みんなのマーケット株式会社のテックブログ。積極採用中です。

ExpressJSフレームワークの紹介

はじめに

こんにちは、みんなのマーケットのテックチームのクイです。
今回、ExpressJSフレームワークと弊社の利用の仕方を紹介します。 みんなのマーケット

ExpressJSとは

サーバーサイドJavaScriptのNode.jsのWebアプリケーションフレームワークである --wikipedia

ExpressJSは軽いウェブフレームワークですが、弊社のシステムの要求に応じてカスタムしやすいです。なので選択して利用しました。

特徴

  • 高速、柔軟、最小限のウェブフレームワーク。
  • 様々なnpmパケージに提供されています。

単純なフレームワークですから、ExpressJSを利用する時色々な処理を自分で実装しなければなりません。それはExpressJSの欠点だと思います。

NodeJSのHTTPモジュールからExpressJSのルートまで

  • NodeJSのHTTPサーバーモジュールはHTTP(Hyper Text Transfer Protocol)メソッドでデータを送信し、受信するというモジュールです。HTTPの様々な機能をサポートできるように設計がされます。
  • まずはExpressJSを使わないで、サーバーを書いてみます:サーバーが3000ポートで動いています。
const http = require("http");
const server = http.createServer();
const callback = function(request, response) {
    response.writeHead(200, {
      "Content-Type": "text/plain"
    });
    response.end("Hello world!\n");
};
server.on("request", callback);
server.listen(3000);

http://localhost:3000にアクセスしたらHello world!文字列をもらえます。

  • リクエスト処理の方法:
    上のコードを見たら、毎回サーバーがリクエストを受けると、callback関数が実行されます。なのでcallbackみたいな関数はリクエスト処理が可能になります。
    だが色々な処理ならどうするのか。幸いにもNodeJSのEventEmitクラスがその問題を解決できます。EventEmitEventEmit.emit()を呼び出してEventEmit.on()の処理をアタッチするクラスです。
    ExpressJSのコードも同じです。以下はExpressJSのコードからの抜粋です。
function createApplication() {
    var app = function(req, res, next) {
        app.handle(req, res, next);
    };

    mixin(app, EventEmitter.prototype, false); // merge-descriptorsパケージでEventEmitterとappの記述をマージします。
    mixin(app, proto, false);

    // expose the prototype that will get set on requests
    app.request = Object.create(req, {
      app: { configurable: true, enumerable: true, writable: true, value: app }
    })

    // expose the prototype that will get set on responses
    app.response = Object.create(res, {
      app: { configurable: true, enumerable: true, writable: true, value: app }
    })

    app.init();  // protoのinit()メソッドがあるから、マージした後appも用いられるようになりました。
    return app;
}

app変数はEventEmitterの継承です。サーバーがリクエストを受ける時

function(req, res, next) {
    // 処理
};

みたいなハンドルで処理します。
ExpressJSに上みたいな関数はMiddlewareと呼ばれます。

  • ExpressJSMiddleware:
    Middleware関数は要求オブジェクト、応答オブジェクト、次のミドルウェアの関数に対する権限を持つ関数です。
    Middleware関数のできること:
    • ロジックを更新可能。
    • 要求オブジェクト、応答オブジェクトを変更可能。
    • 要求応答サイクルをストップ可能。
    • 次のミドルウェアを呼び出す可能。 要求オブジェクトreqはhttp.IncomingMessageが継承され、色々な便利なメソッドが追加されています。
      例えば:
header(name: string): string;
accepts(...type: string[]): string | boolean;
acceptsCharsets(...charset: string[]): string | boolean;
acceptsEncodings(...encoding: string[]): string | boolean;
param(name: string, defaultValue?: any): string;
...

      応答オブジェクトnextは応答ステータスを設定できるとか、
      テキストあるいはファイルなど応答もできます。

status(code: number): Response;
sendFile(path: string, options: any, fn: Errback): void;
render(view: string, callback?: (err: Error, html: string) => void): void;
redirect(url: string, status: number): void;

ExpressJSにおけるスタックで一連のミドルウェアを保存します。next()関数を実行する時スタック内の次のミドルウェアを呼び出します。
次の例はミドルウェアをバインドするケースです。

  • アプリケーションレベルのミドルウェア:
const express = require("express");
const app = express();
app.use(function(req, res, next) {
    // 処理
    next();
});

アプリケーションレベルのAPI

  • エラー処理ミドルウェア:
app.use(function(err, req, res, next) {
    // エラー処理
});
  • ルートレベルのミドルウェア:
const app = express();
const router = express.Router();

router.get("/index/", function(req, res, next) {
    res.render("index");
});
app.use(router);

ルートのAPI

  • サードパーティのミドルウェア:
const csurf = require('csurf');
const express = require("express");
const app = express();
app.use(cookieParser());

ExpressJS + Typescript

以下は弊社の利用方法の一つです。
TypescriptDecoratorでルートとかミドルウェアなどをバインドします。
まずDecoratorを作成します。

export class Router {
    public static get(path: string): MethodDecorator {
        return (target: Function, key: string, descriptor: TypedPropertyDescriptor<any>) => {
            this.defineMethod(Method.GET, path, descriptor.value);
            return descriptor;
        };
    }

    private static defineMethod(method: Method, path: string, target: Object): void {
        let metadata: IMetadata = <IMetadata>Reflect.getMetadata(METHOD_METADATA, target);
        if (!metadata) {
            metadata = {
                urls: [],
                before: [],
                after: []
            };
        }
        metadata.urls.push({
            path: path,
            method: method
        });
        Reflect.defineMetadata(METHOD_METADATA, metadata, target); // TypescriptのReflectでメソッドの`metadata`を保存する
    }
}

Decoratorを利用して実装する。

import { Router } from "../../vendor/router";

export class HomeController {
    constructor() {
    }
    @Router.get("/")
    @Router.get("/index")
    public async index(req: express.Request, res: express.Response, next: express.NextFunction): Promise<any> {
        return res.json({
            message: "Hello world"
        });
    }
}

ルートのミドルウェアをバインドします。

export class Route {
    public static resolve(dir: string, router: express.Router): void {
        klawAsync(dir, { nodir: true }) // コントローラディレクトリのファイル一覧を探す。
        .map(file => file.path)
        .filter(file => file.endsWith(".js")) // javascriptファイルをフィルタリングする。
        .forEach(file => {
            const module: Object = require(file); // javascriptファイルのモジュールをインポートする。
            const controllerContainer: string[] = [];

            Object.keys(module) // 各モジュール名を取得する
            .filter(m => m.endsWith("Controller")) // コントローラをフィルタリングする。
            .forEach(m => {
                if (controllerContainer.indexOf(m) !== -1) return; // もしコントローラが登録されたら次のコントローラを処理する。
                controllerContainer.push(m);
                const instance = new module[m](); // コントローラインスタンスを作成する。

                Object.getOwnPropertyNames(Object.getPrototypeOf(instance)) // コントローラのメソッドを取得する。
                .filter(method => method != 'constructor') //constructorメソッドが無視される。
                .forEach(method => {
                    const metadata = <IMetadata>Reflect.getMetadata(METHOD_METADATA, instance[method]); // メソッドの定義されたmetadataを取得する
                    const middleware = async (req, res, next) => {  // ミドルウェアを定義する。
                        const result = instance[method](req, res, next);
                        if (result["then"]) {
                            await result;
                        }
                    }

                    metadata.urls.forEach(item => {
                        const args = [item.path, ...metadata.before, middleware, ...metadata.after]; // 一連のミドルウェアを作成する
                        router[item.method].apply(router, args); // 一連のミドルウェアを登録する。
                    });
                });
            });
        });
    }
}

コードの参考はこちらです

最後に

今後、カスタムしたExpressJSの構成をもっと独立性が守られるように他のテクニックを当てはまる予定です。ExpressJSに興味があるとか素敵な技術を持っている方はぜひお待ちしてます!
次回は、DuyさんによるReSwiftについての記事です。

Flutterモバイルフレームワークの紹介

はじめに

みんなのマーケットでテックチームに所属しています、楊です。

今回は、Googleが最近発表したFlutterモバイルフレームワークを紹介します。
簡単に言ったら、一応Google版のReact Nativeという理解でいいと思います。

最初にFlutterのドキュメントを褒めてあげたいです。順調にインストールできました。
普段だとGoogleのドキュメント通り1ステップずつ実施して、最後にうまくいかず、stackoverflowで答えを探すのが個人的な日常です。

普段のGoogleドキュメントを参照しながら、開発する私:

f:id:curama-tech:20180330140440g:plain

試す狙い

今回くらしのマーケットで新規開発する「出店登録〜審査」の機能に対して、Tech Stack を選びたい。
独立の機能なので、独立で開発して、後はlibraryの形で主Appにintegrateしたらいいんじゃないかと思う(主AppはLibraryの入り口しか知らない、High aggregation and low coupling)。

A機能を開発する際に、Module A を作って、該当 Module を独立で実行できるようにすれば、Application A で爆速 compile & run できる(UIテストも簡単になる)。
最後リリースする際に、Main Application で各 Module を組み立てて、packageする。

Appアーキテクチャ:

f:id:curama-tech:20180330140530p:plain

要件

  • CrossPlatformの開発能力
  • うちのサポートしているデバイスやOSバージョンに合わせる
  • 既存のNative Appに集約できる
  • Material Designに優しいFrameworkなら、更によい。

結論

React Nativeより完成度や便利性がもっと高いFrameworkで、私の理想のようだけど、まだ本番アプリには使えない段階だ

理由1 - サポートしているデバイスやOSバージョン:

  • まだ 32 bit の iOS デバイスに対応していない(iPhone 4 や iPhone 5など)
  • Android sdk 16以上のみに対応している (うちは 14 から)

理由2 - 大事なWebViewはまだ実装していない:

  • ひとつのアプリの中にたまにnativeで実装したくない機能があると思う

理由3 - JSX よりもっとやばい書き方:

  • 目が痛い
    @override
    Widget build(BuildContext context) {
        return new Scaffold(
           appBar: new AppBar(
               backgroundColor: Theme.of(context).canvasColor,
               elevation: 0.0
           ),
           body: new Column(
               crossAxisAlignment: CrossAxisAlignment.stretch,
               children: <Widget>[
                   // Give the key-pad 3/5 of the vertical space and the display 2/5.
                   new Expanded(
                       flex: 2,
                       child: new CalcDisplay(
                           content: _expression.toString()
                       )
                   ),
                   const Divider(height: 1.0),
                   new Expanded(
                       flex: 3,
                       child: new KeyPad(calcState: this)
                   )
               ]
           )
       );
   }

理由4 - Dart 言語少し不安定(まだ進化中):

  • 今使っている Dart 1 はまるで no-JVM java Lite な感じがする(同じように、null safety がない、why!!!)
  • Kotlinの勉強を始めたばかり! !!

すばらしさ

  1. SDK レベルの Redux
  2. No more controllers / views / layouts / viewGroup / xml / xib / activity / fragment / delegate / resource
  3. Only ウィジェット / State / Reducer ひとつの ウィジェット を一ページとしてもいいし、複数のウィジェットを組み立てもいいし、すごくアジャイルである。
    つまり開発過程はレゴ玩具のように作れる。No More MVC, MVVM, MVP, VIPER(一部分のNative開発者はすごく嫌かも)。Reducer はまったくUIと関係がないし、Pure Fuction なので、ユニットテストは書きやすい。

  4. SDK レベルの Router 画面遷移はさらに簡単になる。WebView(もしあれば^_^||)やプッシュ通知からも簡単に Native ページを起動できる(urlと同じフォーマット)

  5. 既存のNative Appに集約できる

  6. ひとつのページに半分 Native 半分 Flutter も簡単に実現できて、お互いに通信できる(MessageChannelを通して)

  7. 豊富な UI フレームワーク 標準のUIフレームワークに Android の Material Design ウィジェット と iOS の Cupertino Style ウィジェットを両方提供している。 これで Android に Cupertino 風アプリ、 iOS に MD 風アプリを簡単に作れる。

Material Design:

f:id:curama-tech:20180330140628p:plain

Cupertino:

f:id:curama-tech:20180330140642p:plain

  1. iOS UIKit や Android View Package に依存していない React Native は JS を iOS と Android それぞれの Component に変換していく。
    これで大きな問題ふたつがある:システムの Component が修正したり、バグあったりのとき、React Native は待つことしかできない;また、両方統一していない Component は if else の分岐でわけて開発しないといけない。
    Flutterの方は全部の ウィジェット をゼロから実装したので(Skia の力)、Platform や OS とかかわらず、統一している。問題が発生しても、Google 側ですぐに解決ができる。
    もし React Native は Learn Once, Write Everywhere なら、 Flutter は Write Once, Deploy Everywhere だなぁ。

現実世界のReact Native(表から考えると小さな開発だけど、着手したら大きな罠が待っている):

f:id:curama-tech:20180330140657g:plain

  1. 仮想 DOM React Native と同じように、仮想 DOM を実装した。
    これによって State から行われる画面の更新が結構早くなるし、Hot Loading 機能も合わさってさらに開発の効率が上がる。

  2. 便利な auto layout iOS の Constraint Layout と Android の Layout 両方サポートしている(React Native は flexbox のみ)。

  3. Dart 言語に非同期処理がサポートされている Dart 1 には Future を使っていて、 Dart 2 には async await も追加していきました。
    (はい、Swiftさん、君のことに不満がある)

非同期処理:

f:id:curama-tech:20180330140858g:plain

最後の一言

実際数年前、同じような技術はすでに存在している。 Adobe の AIR っていうことだ。

みんなのマーケットでは、一緒に働いてみたいといった方をお待ちしてます。

次回は、Webエンジニアの記事です。

Implementing a simple pipeline with Airflow

Hi, this is Sushant from the Minma, Inc. tech-team and today, I would like to introduce you to Apache Airflow, an open-source workflow orchestration platform implemented by Airbnb. The aim of this post is to provide a tutorial on using Airflow to implement a simple pipeline.

The old days

Our company was founded in 2011. Back then, our system was very simple. We had one database supported by a single django server. Since our user base was very small, our requirements were simple, and thus, our scheduled batch jobs were also very straightforward. All we had were a few small python scripts scheduled by cron, and at that time, this was all we needed.

At curama.jp, we have a lot of scheduled jobs, from sending reminder emails to our users, updating statistics for our vendors (service providers), performing ETL for analytics, and so on.

As our company grew from supporting a few hundred vendors, to a few thousands, our batch jobs also started growing in complexity. As years went by, our scheduled jobs started to become more and more complicated and we started having a few troubles:

  • Logging and Profiling: Since our tasks were all scheduled by cron, we had to manually log in to the batch server, go through the logs and check if our tasks were running as they were supposed to. Since we didn't have a built-in logging system, we had to code the logging commands in each of our scripts manually. And with no profiling feature, we also had no idea how long our batch jobs were taking every run.

  • Notification on failure: When we were using cron, we had not designed our scripts to send notifications when a job failed. We had to check our logs frequently to make sure our batch jobs weren't failing. It was time consuming and inefficient to go through our logs daily just to make sure nothing was going wrong.

  • Task Dependencies: Some of our tasks had dependencies amongst them. We had dependencies like "Task A must run only if Task B succeeds", "Task F should run when Tasks D, E and F succeed", "it's ok to run Tasks X, Y, Z" in parallel, "if Condition A holds true, run task M, else run task N", etc. With our cron jobs, we had to program these dependencies inside the scripts and as a result, our code was becoming very complex by each line of code.

  • Scalability: All our cron tasks were run in a single EC2 instance. To make sure two jobs didn't fight for resources, we manually scheduled all of our jobs such that no two job runs overlapped. As the number of batch jobs started growing, it was becoming harder to keep this constraint intact.

After searching for a tool that would make managing our batch jobs easier, we stumbled across Airbnb's open source workflow management platform, Apache Airflow.

Quoting the Apache Airflow docs:

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

Apache Airflow solved most of our problems out-of-the-box. The built-in logging and profiling helped us monitor our tasks and made debugging easier if something went wrong. The rich user interface allowed us to visualize tasks and their dependencies, the customization options allowed us to easily send notifications when something unexpected happened, support for Celery gave us the option of scaling, if we ever found that running on a single machine was reaching its limits.

Airflow Concepts:

a. DAG (Directed Acyclic Graph):

A DAG is the unit of a single workflow in Airflow. It represents a graph of related tasks required to accomplish a certain job.

b. Operators & Tasks:

Operators are Classes that dictate how something gets done in a DAG.

  • BashOperator: Executes a bash command
  • PostgresOperator: Executes a SQL command or script in Postgres
  • PythonOperator: Executes a python callable (function)

An instance of an Operator is called a Task. Two task instances may use same Operator but, they are never the same.

Implement a simple pipeline using Airflow

To illustrate how easy it is to build a pipeline with airflow, we are going to build a simple pipeline for creating a hawaiian pizza.

At the end of this tutorial we will have implemented a DAG (workflow) that looks something like this. f:id:curama-tech:20180323123025p:plain

The graph in the picture represents a DAG and all the small rectangular boxes are tasks. As you can see, just by looking at the DAG, we can figure out how the different tasks combine to complete the pipeline.

For our simulation, we will make the following assumptions:

  1. A pizza in our example is a text file.
  2. At every step in the pipeline, some text will be appended to the file.

The point is that at each stage of the pipeline, the corresponding operator performs some action on the pizza, and once all the steps are complete, our pizza (text file) will have the desired crust, sauce, toppings, etc.

Setup Airflow:

  • Create a directory for the tutorial.

    > mkdir ~/airflow_tutorial

  • Specify a home directory for airflow.

    > export AIRFLOW_HOME=~/airflow_tutorial

  • Install airflow

    > pip install apache-airflow

  • Initialize airflow

    > cd ~/airflow_tutorial

    > airflow initdb

  • Start the webserver

    > airflow webserver -p 8080

    Once the webserver starts, if you go to http://localhost:8080, you should be able to see a screen similar to this.

    f:id:curama-tech:20180323123049p:plain

    This page shows all the tasks managed by airflow. The ones shown by default are the example DAGs packaged with airflow. You can disable these by changing the load_examples setting to False in airflow.cfg file.

  • Configuration options

    When you first use the airflow init command, airflow creates an airflow.cfg file in the AIRFLOW_HOME directory. There are various configuration options available to fit different needs. For more detailed configuration options check out this link. For this example, we are just going to go with the default configuration.

Setup Directories

For this example we are going to store our dags in ~/airflow_tutorial/dags directory. Make sure that the dags_folder setting in airflow.cfg is set to the full path of ~/airflow_tutorial/dags. Then create the dags directory as follows: mkdir ~/airflow_tutorial/dags touch ~/airflow_tutorial/dags/__init__.py

We are going to create custom operators to create our pizza. So lets go ahead and create an operators directory inside ~/airflow_tutorial/dags.

> cd ~/airflow_tutorial/
> mkdir dags/operators
> touch ~/airflow_tutorial/dags/operators/__init__.py

Create pizza

First, let's think about what we need to create our Hawaiian pizza. For this example, we are going to create a thin-crust hawaiian pizza topped with mozzarella cheese, ham, mushroom and pineapple. So, we are going need a crust, some tomato sauce, toppings like ham, cheese, pineapple, and finally an oven to bake our pizza. To build our pizza pipeline, we are going to implement operators for each of the above.

  • ThinCrustOperator to create our thin crust.
  • TomatoSauceOperator to apply tomato sauce to our crust.
  • MozzarellaCheeseToppingOperator, HamToppingOperator, MushroomOperator and PineappleToppingOperator to add toppings to our pizza.
  • OvenOperator to bake our pizza.

Let's start with creating our ThinCrustOperator!

  1. Create a file named crusts.py inside the operators directory.
  2. Add the following lines to crusts.py.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import datetime

class ThinCrustOperator(BaseOperator):
    ui_color = "#e8d4ad"   # Hex color code for visualizing the Operator. 

    @apply_defaults
    def __init__(
            self,
            pizza,
            *args, **kwargs):
        self.pizza = pizza
        super(ThinCrustOperator, self).__init__(*args, **kwargs)
        

    def execute(self, context):
        message = "[{}] Creating thin crust.......DONE\n".format(datetime.datetime.now().isoformat())
        with open(self.pizza + ".txt", "a+") as f:
            f.write(message)
        logging.info(message)
        

The code above is the basic structure of an Operator. All operators in Airflow are derived from the BaseOperator class or its children. All Operators must have an execute method. This method is called when the task (remember that a task is an instance of an Operator) is run.

All our ThinCrustOperator does is open the text file (pizza in our case) and appends the text "Creating thin crust.......DONE" to it.

Note: The ui_color attribute is the hex color code for the operator. When we visualize our dags in the Airflow webserver, any task that is an instance of the ThinCrustOperator class will have that color.

Similarly, we create our TomatoSauceOperator class with code that looks like this:

import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import datetime

class TomatoSauceOperator(BaseOperator):
    ui_color = "#d83c0e"    # Red for tomato sauce. 

    @apply_defaults
    def __init__(
            self,
            pizza,
            *args, **kwargs):
        self.pizza = pizza
        super(TomatoSauceOperator, self).__init__(*args, **kwargs)
        

    def execute(self, context):
        message = "[{}] Applying tomato sauce.......DONE\n".format(datetime.datetime.now().isoformat())
        with open(self.pizza + ".txt", "a+") as f:
            f.write(message)
        logging.info(message)
        

As you probably have guessed, we create all the remaining operators (MozzarellaCheeseToppingOperator, HamToppingOperator, MushroomOperator, etc) with similar code. So, go ahead and implement the remaining operators similarly, and then we shall combine these operators to build our pizza pipeline.

To create our pipeline, we first need to create a file that contains our DAG object. So let's first create a file called create_hawaiian_pizza.py inside our dags folder.

A pipeline in Airflow is a DAG (Directed Acyclic Graph). Creating a DAG requires the following steps:

  • Create a DAG instance Inside create_hawaiian_pizza.py, add the following code.

    ```python from airflow import DAG from datetime import datetime, timedelta

    default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 3, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5) }

    create_hawaiian_pizza_dag = DAG( 'CreateHawaiianPizza',
    default_args=default_args, schedule_interval=None ) ``` A DAG instance is created from the DAG class from Airflow. This is an object that represents a single pipeline. All tasks inside the pipeline are associated to this object. For scheduling options, check out this link.

  • Add tasks to the DAG instance

To associate tasks to our pipeline, we need to instantiate our operators.

    pizza_instance = "my_hawaiian_pizza"

    prepare_crust = ThinCrustOperator(
        task_id="prepare_crust",
        pizza=pizza_instance,
        dag=create_hawaiian_pizza_dag
    )
    apply_tomato_sauce = TomatoSauceOperator(
        task_id="apply_tomato_sauce",
        pizza=pizza_instance,
        dag=create_hawaiian_pizza_dag    
    )
    add_cheese = MozzarellaCheeseToppingOperator(
        task_id="add_cheese",
        pizza=pizza_instance, 
        dag=create_hawaiian_pizza_dag
    )
    add_ham = HamToppingOperator(
        task_id="add_ham",
        pizza=pizza_instance,
        dag=create_hawaiian_pizza_dag
    )
    add_pineapple = PineappleToppingOperator(
        task_id="add_pineapple",
        pizza=pizza_instance,
        dag=create_hawaiian_pizza_dag
    )
    add_mushroom = MushroomToppingOperator(
        task_id="add_mushroom",
        pizza=pizza_instance,
        dag=create_hawaiian_pizza_dag
    )
    bake_pizza = OvenOperator(
        task_id="bake_pizza",
        pizza=pizza_instance,
        dag=create_hawaiian_pizza_dag
    )

Notice that we are passing the instance of our DAG (create_hawaiian_pizza_dag) to the operator. This is required in order to associate all our tasks to the DAG instance. Once we have created our tasks (instances of Operators), we need to set up the dependencies between then.

Remember, we want to build a pipeline that looks like this: f:id:curama-tech:20180323123025p:plain

So, let's first set up a few rules!

  1. Since we need a crust before we can apply sauce, apply_tomato_sauce can only be executed after prepare_crust is complete.
  2. We want to apply our toppings only after tomato sauce is applied to the pizza. Therefore, add_cheese, add_ham, add_pineapple and add_mushroom can only be executed after apply_tomato_sauce is complete. For the toppings, we don't care about the order in which the toppings are applied. Since one topping is not dependent on the other, we can run add_cheese, add_ham, add_pineapple and add_mushroom in parallel.
  3. We can only bake the pizza after all the toppings are applied. Therefore, bake_pizza can only be executed after ALL of add_cheese, add_ham, add_pineapple and add_mushroom have completed.

Each of these rules are implemented as follows:

Rule 1

The dependency for rule no. 1 is set as follows:

prepare_crust.set_downstream(apply_tomato_sauce)

OR

apply_tomato_sauce.set_upstream(prepare_crust)

Both the above statements perform the same function. Additionally, to make coding easier, Airflow provides some syntactic sugar for the set_downstream and set_upstream functions. We can write the same code above as follows:

# Same as prepare_crust.set_downstream(apply_tomato_sauce)
prepare_crust >> apply_tomato_sauce

# Same as apply_tomato_sauce.set_upstream(prepare_crust)
apply_tomato_sauce << prepare_crust
Rule 2

Implementing the order for apply_tomato_sauce and the toppings is easily applied using the same >> operator as rule 1.

apply_sauce >> add_cheese
apply_sauce >> add_ham
apply_sauce >> add_pineapple
apply_sauce >> add_mushroom

The >> and << operators also take lists. So the same code above can also be written as follows:

add_toppings = [add_cheese, add_ham, add_pineapple, add_mushroom]
apply_sauce >> add_toppings
Rule 3

Finally, to make sure bake_pizza is executed after all the toppings are added, we could do something like this:

# Using the same add_toppings list from rule 2.
add_toppings >> bake_pizza

In the end, our create_hawaiian_pizza.py file should look something like this:

from airflow import DAG
from datetime import datetime, timedelta
from operators.crusts import ThinCrustOperator
from operators.oven import OvenOperator
from operators.sauces import TomatoSauceOperator
from operators.toppings import (
    MozzarellaCheeseToppingOperator,
    HamToppingOperator,
    MushroomToppingOperator,
    PineappleToppingOperator
)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 3, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5)
}

create_hawaiian_pizza_dag = DAG('CreateHawaiianPizza',  default_args=default_args, schedule_interval=None)

pizza_instance = "my_hawaiian_pizza"

prepare_crust = ThinCrustOperator(
    task_id="prepare_crust",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag
)

apply_sauce = TomatoSauceOperator(
    task_id="apply_tomato_sauce",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag
)

add_cheese = MozzarellaCheeseToppingOperator(
    task_id="add_cheese",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag
)
add_ham = HamToppingOperator(
    task_id="add_ham",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag,
)
add_pineapple = PineappleToppingOperator(
    task_id="add_pineapple",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag
)
add_mushroom = MushroomToppingOperator(
    task_id="add_mushroom",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag
)

bake_pizza = OvenOperator(
    task_id="bake_pizza",
    pizza=pizza_instance,
    dag=create_hawaiian_pizza_dag
)

add_toppings = [add_cheese, add_ham, add_mushroom, add_pineapple]
prepare_crust  >> apply_sauce >> add_toppings >> bake_pizza

And that's it! We have built our pizza pipeline. Let's check to see if it works!

Make sure the webserver and scheduler are both running. Go to http://localhost:8080 and if everything goes well, our new dag should appear in the DAGs list. f:id:curama-tech:20180323123151p:plain

Let's try running the DAG. To do so, just click on the Trigger DAG button in the Links column.

To visualize the pipeline, CreateHawaiianPizza DAG, and in the next screen, click on Graph View to visualize the pipeline. f:id:curama-tech:20180323123205g:plain

Once the DAG run is complete, we should see a my_hawaiian_pizza.txt file inside our AIRFLOW_HOME directory. f:id:curama-tech:20180323123358p:plain

To test what happens when a task fails, let's change the ThinCrustOperator so that it throws an error like so:

def execute(self, context):
        raise Exception("Some error message here!!")
        message = "[{}] Creating thin crust.......DONE\n".format(datetime.datetime.now().isoformat())
        with open(self.pizza + ".txt", "a+") as f:
            f.write(message)
        logging.info(message)

Now, if we run our DAG again, we should see that the task fails and that the exception is logged properly.

f:id:curama-tech:20180323123413p:plain

The red line around the task means that the task has failed. Now, if we click on the task and click on View Log, we should see the exception logged like this: f:id:curama-tech:20180323123606p:plain

At curama.jp, we have been using Airflow for over a year now, we have been really satisfied with the results. Since our system is still not that complex, we have still to use Airflow to its fullest capacity. But so far, Airflow as a workflow management tool looks really promising.

This was just a simple example to demonstrate how easy it is to implement a simple workflow with Airflow. For a more in-depth look, please check out the Airflow documentation. If you are having problems using airflow, asking questions here could point you in the right direction.

PS: We are hiring! For those interested, feel free to apply here. At least conversational Japanese proficiency is required.

Kibanaにログイン機能を実装してみた

はじめに

みんなのマーケットでSREチームに所属しています、千代田です。
記事を書くのは2回目ですので、もし良ければ、
前回の記事(Prometheusを用いたSupervisor上のプロセス監視)も読んでみてください。

Kibanaとは

Kibana is an open source data visualization plugin for Elasticsearch.
It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster.
Users can create bar, line and scatter plots, or pie charts and maps on top of large volumes of data. wikipedia

要約すると、
Elasticsearchのデータに対して可視化/分析などを行うことができるツールです。

Kibanaにログイン機能を実装できるプラグインの例

  • X-Pack
    公式から出されているプラグインです。
    機能としては、Security、Alerting、Monitoring、Reporting、Graph、Machine Learningなど幅広くカバーされています。
    こちらのライセンスモデルは、BASIC、GOLD、PLATINUM、ENTERPRISEがあります。
    ログイン機能が使えるのはGOLD以上のライセンスとなります。
    また、無償のトライアルでも30日ほど試すことができます。

  • Search Guard
    今回はこちらのプラグインを使用しています。
    機能としては、Securityに特化しています。
    こちらのライセンスモデルは、Community、Enterprise、Complianceがあります。
    今回実装したいのは、ログイン機能のみなのでCommunityを利用します。

今回の構成

最新バージョンを使用する場合(マイナーバージョンを含む)は、
適宜読み替えてください。

  • AWS: ec2(proxy server + elastic server)
  • Elasticsearch: 5.6.7
  • Logstash: 5.6.7
  • Kibana: 5.6.7
  • SearchGuard: 5.6.7-6

ELKスタック導入

導入済みを前提としているため、割愛します。

本記事における各アプリケーションとディレクトリは次のとおりです。

アプリケーション ディレクトリ
Elasticsearch /opt/elasticsearch
Logstash /opt/logstash
Kibana /opt/kibana

Search Guard導入

公式のドキュメントを読んだ上で以下を読み進めていくことを推奨します。

ElasticsearchにSearch Guardのプラグインをインストールします

cd /opt/elasticsearch
bin/elasticsearch-plugin install -b com.floragunn:search-guard-5:5.6.7-19

TLSのセットアップ

複数の作成方法がサポートされています。
詳しくは公式のTLS Setup > Generating Cerificatesの章を確認してください。
今回は公式で用意されているオフライン用のスクリプトを使用しますが、必要に応じて変更してください。

mkdir /tmp/search-guard-tlstool-1.1
cd /tmp/search-guard-tlstool-1.1 
wget https://search.maven.org/remotecontent?filepath=com/floragunn/search-guard-tlstool/1.1/search-guard-tlstool-1.1.tar.gz -O search-guard-tlstool-1.1.tar.gz
tar xvzf search-guard-tlstool-1.1.tar.gz
rm -rf search-guard-tlstool-1.1.tar.gz
cd ..
sudo mv search-guard-tlstool-1.1 /opt/elasticsearch/

TLSのコンフィグを作成します

cd /opt/elasticsearch/search-guard-tlstool-1.1/config
vim tlsconfig.yml

必要に応じて値を変更してください。

ca:
   root:
      dn: CN=root.ca.example.com,OU=CA,O=Example Com\, Inc.,DC=example,DC=com
      keysize: 2048
      validityDays: 3650
      pkPassword: auto
      file: root-ca.pem
defaults:
      validityDays: 3650
      pkPassword: auto
      generatedPasswordLength: 12

      # 今回は外部との暗号化をproxy serverで実装しているため、falseにしてあります。
      # Set this to true in order to generate config and certificates for
      # the HTTP interface of nodes
      httpsEnabled: false
nodes:
  - name: node1
    dn: CN=node1.example.com,OU=Ops,O=Example Com\, Inc.,DC=example,DC=com
    dns: node1.example.com
    ip: 10.0.2.1
clients:
  - name: admin
    dn: CN=admin.example.com,OU=Ops,O=Example Com\, Inc.,DC=example,DC=com
    admin: true

sgtlstool.shを実行します

tlsconfig.ymlをもとに各種PEM、KEYを生成します。

cd /opt/elasticsearch/search-guard-tlstool-1.1/
./tools/sgtlstool.sh -c ./config/tlsconfig.yml -ca -crt
cd out
cp ./* /opt/elasticsearch/config/

elasticsearch.ymlを設定します

node1_elasticsearch_config_snippet.ymlの内容を既存のelasticsearch.ymlに追記します。

cd /opt/elasticsearch/config/
chown elasticsearch:elasticsearch node1.pem node1.key root-ca.pem
vim elasticsearch.yml
## 省略
searchguard.ssl.transport.pemcert_filepath: node1.pem
searchguard.ssl.transport.pemkey_filepath: node1.key
searchguard.ssl.transport.pemkey_password: PASSWORD
searchguard.ssl.transport.pemtrustedcas_filepath: root-ca.pem
searchguard.ssl.transport.enforce_hostname_verification: false
searchguard.ssl.transport.resolve_hostname: false
searchguard.ssl.http.enabled: false
searchguard.nodes_dn:
- CN=node1.example.com,OU=Ops,O=Example Com\, Inc.,DC=example,DC=com
searchguard.authcz.admin_dn:
- CN=admin.example.com,OU=Ops,O=Example Com\, Inc.,DC=example,DC=com

Elasticsearchを起動します

service elasticsearch start

sgconfigの設定をします

cd /opt/elasticsearch/plugins/search-guard-5/sgconfig/

sg_config.yml

今回は特に必要がないので、以下を除いてほかはすべてコメントアウトします。

searchguard:
  dynamic:

sg_roles.yml

既存の設定によって変更箇所が異なります。
使用しているindex名を変更しているなどある場合は、必要に応じて適切なパーミッションを設定してください。

sg_internal_users.yml

今回はadmin、kibanaserver、logstashを使用するため、その三種類を残します。

admin:
  hash: HASH
logstash:
  hash: HASH
kibanaserver:
  hash: HASH

HASHの箇所に設定するパスワードハッシュの生成方法も載せておきます。

cd /opt/elasticsearch/plugins/search-guard-5/tools
chmod u+x hash.sh
./hash.sh 
[Password:] パスワード入力
パスワードハッシュが返ってきます

sg_action_groups.yml

今回は特にいじる必要がない為触れませんが、特別なパーミッションをグループにしたい場合は変更してください。

sg_roles_mapping.yml

今回はadmin、kibanaserver、logstashを使用するため、その三種類を残します。

sg_all_access:
  users:
    - admin

sg_logstash:
  users:
    - logstash

sg_kibana_server:
  users:
    - kibanaserver

以上で、sgconfig関連の設定が完了です。

sgadmin.shを実行します

Elasticsearchにsgconfigの設定を反映します。
admin.keyのパスワードは/opt/elasticsearch/search-guard-tlstool-1.1/client-certificates.readmeに書いてあります。

cd /opt/elasticsearch/plugins/search-guard-5/tools/
./sgadmin.sh -cd ../sgconfig/ -icl -nhnv -cacert /opt/elasticsearch/config/root-ca.pem -cert /opt/elasticsearch/config/admin.pem -key /opt/elasticsearch/config/admin.key -keypass admin.keyのパスワード -h elasticsearch.host

Elasticsearchの確認をします

curl --insecure -u admin:sg_internal_usersのadminパスワード 'https://elastic.host:9200/_searchguard/authinfo?pretty'

KibanaにSearch Guardのプラグインをインストールします

cd /tmp
wget https://github.com/floragunncom/search-guard-kibana-plugin/releases/download/v5.6.7-6/searchguard-kibana-5.6.7-6.zip
cd /opt/kibana
bin/kibana-plugin install file:///tmp/searchguard-kibana-5.6.7-6.zip

kibana.ymlを設定します

cd /opt/kibana/config/
vim kibana.yml
## 省略
elasticsearch.username: "kibanaserver"
elasticsearch.password: "sg_internal_users.ymlのHASH前のkibanaのパスワード"
elasticsearch.ssl.verificationMode: none

Kibanaを起動します

service kibana start

logstash.ymlを設定します

cd /opt/logstash/config/
vim logstash.yml
## 省略
output {
    elasticsearch {
        hosts => ["elastic.host"]
        user => logstash
        password => sg_internal_users.ymlのHASH前のlogstashのパスワード
    }
}

Logstashを起動します

service logstash start

Kibanaのログイン画面を確認します

adminユーザーでログインします。
f:id:curama-tech:20180316111911p:plain

ログイン後は、普段どおりに使うことができます。

Search Guardを無効にする

elasticsearch.ymlに追記した箇所をコメントアウトしたうえで、searchguard.disabled: trueを指定してください。
kibana.ymlに追記した箇所をコメントアウトしたうえで、searchguard.basicauth.enabled: falseを指定してください。
logstash.ymlに追記した箇所をコメントアウトしてください。

削除をする場合は、公式の手順にしたがってください。

感想

以上で、簡単にログイン機能を実装できました。
公式のドキュメントを読むと分かるのですが、
設定がかなり細かくできるのでユーザーごとに分けるのが良さそうです。
また、一度パーミッションを決めた後に変更をする場合も再度sgadmin.shを実行するだけでよいといった手軽さもあります。
ログイン機能を実装するか迷っている方がいらっしゃれば一度試してみることをお勧めします!

次回は、WebエンジニアによるAirflowについての記事です!
また、一緒に働いてみたいといった方もぜひお待ちしてます!

Amazon Connect使ってみた!

CTOの戸澤です。 今回は、昨年AWSから発表されたAmazon Connectを使ってIVRを構築してみます。

f:id:curama-tech:20180309105522p:plain

Amazon Connectとは

Amazon Connectは、AWS上でコールセンターのシステム(IVR,PBX)を構築できるサービスです。 クラウドなのでスケールしやすく、使った分だけの従量課金です。 AWSのLexやLambdaとの連携、S3に録音を保存できるなどAWSの他のサービスを活用したシステムの構築ができます。

参考: Amazon Connect(簡単に使えるクラウド型コンタクトセンター)|AWS

利用にはAWS Console上で問い合わせフローを構築する必要があります。構築はGUIでドラッグ&ドロップするだけです。 インバウンド、アウトバウンド双方の通話をブラウザ上で動作するソフトフォンを使ってできます。

Amazon Connectは、2018/03/07現在、EU, バージニア、オレゴン、シドニーリージョンの4リージョンに展開されています。 東京リージョンには来ていませんが、シドニーリージョンで日本の050番号が取得できるようですので、今回はシドニーリージョンを使って試してみます。

なぜ電話システム(IVR)が必要なのか

みんなのマーケットには、コンサルティング本部という部署があり、くらしのマーケットのカスタマーサポートと出店者へのコンサルティングを行っています。

基本的には、メールでやりとりをしますが、出店者へのコンサルティング、カスタマーサポートは電話で行うことも多くあります。お客様と出店者それぞれ担当するスタッフが異なるため、問い合わせフローを使って振り分ける必要があります。

構築

Amazon Connectのウィザードが充実しているので、基本的にそれにしたがって構築していけます。

今回は簡単なIVRを作ってみます。 コールフローはインバウンドとアウトバウンドの双方に対応でき、簡単に

  • インバウンド: カスタマーからの着信 -> 対応可能時間の判定 -> ルーティング -> キューイング -> 担当者が受話
  • アウトバウンド: 担当者が発信 -> カスタマーが受話

という流れになります。

1.電話番号の取得まで

ウィザードにしたがって、

f:id:curama-tech:20180309105537p:plain

  1. インスタンスの作成
  2. 管理者の設定
  3. テレフォニーの設定(インバウンド、アウトバウンドの通話を許可するか)
  4. データストレージの設定(録音とログを保存するS3 Bucketを指定)

を進めます。

次に電話番号を取得します。 今回は日本の050番号を取得します。 050番号の他に、フリーダイヤルの0800番号も取得できます。

f:id:curama-tech:20180309105551p:plain

電話番号の取得ができたら、さっそく通話のテストをしてみます。 携帯から今回取得した050番号に電話してみると、携帯とAmazon Connect(ブラウザ)の間で通話ができました。

f:id:curama-tech:20180309105602p:plain

2.問い合わせフローで使う各種設定

問い合わせフローを構築する前に、問い合わせフローで使うオペレーション時間(電話対応する時間帯と曜日)、エージェントが受話するまでに待機するキュー、プロンプト(音声による案内)の登録を先に行います。

オペレーション時間

今回は、BusinessHoursという名前で新規にオペレーション時間を作成します。 タイムゾーンはJapan, すべての曜日、開始AM9時、終了PM5時に設定します。

f:id:curama-tech:20180309105617p:plain

キュー

InboundQueueという名前で新規にキューを作成します。 オペレーション時間をさきほど作成した、BusinessHours, アウトバウンド発信者ID番号を取得した050番号に、ID名を適当に設定します。

f:id:curama-tech:20180309105626p:plain

プロンプト

今回は、コールフローから次のプロンプトを登録しました。

  1. 「お電話ありがとうございます。」
  2. 「IVRのテストです。お問合わせの場合は1を、終了する場合は9を押してください。」
  3. 「只今の時間は営業時間外です。平日の9時から17時のあいだにあらためてお掛け直しをお願いします。」
  4. 「エラーが発生しました。申し訳ありませんが、しばらく経ってからおかけ直しください。」
ルーティングプロファイルとユーザーへの割り当て

ルーティングプロファイルを作成します。 ルーティングプロファイルは、エージェントが対応するキューの設定です。 InboundRoutingProfileという名前で新規に作成します。 キューはさきほど作成したInboundQueueを設定します。

f:id:curama-tech:20180309105638p:plain

現在、このインスタンスでは唯一の管理者だけがユーザーとして登録されています。インバウンド通話はこのユーザーがエージェントとなって受けるため、管理者のルーティングプロファイルを今回作成したInboundRoutingProfileに変更し、InboundQueueの対応ができるように設定します。

f:id:curama-tech:20180309105647p:plain

ここまで、必要な設定は完了しました。 次に問い合わせフローを構築していきます。

3.問い合わせフローを作成

今回はテンプレートを使わずに、ゼロから構築してきます。

f:id:curama-tech:20180309105703p:plain

フロー図のとおりですが、キューの設定 -> オペレーション時間の確認 -> 番号入力で分岐 -> キューへ転送 -> エージェントが受話 という流れに、各エラーハンドリングが付属している状態です。 この問い合わせフローを保存します。

さきほど取得した050番号はデフォルトでは別の問い合わせフローと紐付いているので、今回作成した問い合わせフローに紐付きを変更します。

f:id:curama-tech:20180309105716p:plain

ここまでの問い合わせフローの「保存」だけでは、着信しても使える状態になっていません。 最後に、問い合わせフローの「保存して発行」をしてデプロイします。

ここまで、IVRの構築が完了しました。

電話してみる

実際に使えるか試してみます。 Amazon Connectの管理画面 右上にある電話マークをクリックして、Contact Control Panel(CCP)を開き、Availableの状態にして待機しておきます。

1.インバウンド

携帯からAmazon Connectの050番号へ発信してみます。

f:id:curama-tech:20180309105728p:plain

f:id:curama-tech:20180309105737p:plain

プロンプトが再生され、1番を押下し、CCPから受話ができました。

2.アウトバウンド

f:id:curama-tech:20180309105747p:plain

Amazon Connectから携帯にアウトバウンドの電話をしてみます。 CCPのダイヤルパッドを使って、発信できました。 ただ、携帯(ドコモ)では、着信時に非通知設定になり050番号は表示されませんでした。

今後に期待すること

1. オペレーション時間に祝日を設定できる

オペレーション時間の設定では、曜日と時間帯別に対応の可否を設定できます。現在、利用している電話システムでは、祝日は対応時間外に設定できますが、それと同等の機能ができるとたいへん便利です。 とりあえずは、手動で祝日前に曜日単位で対応外に設定することで対応できそうです。(祝日明けに元に戻すのを忘れないようにする必要があります。また、Amazon Connectは、Lambdaと連携できるのでLambdaで祝日判定をすることもできそうです。)

2. 固定電話の番号を取得できる

現在、シドニーリージョンの日本の電話番号では050番号と通話料無料の0800番号が使えるようです。 フリーダイヤルの0120は場合によっては、東京03〜のような固定電話の番号でないとつながらない場合があります。

3. 番号通知してアウトバウンド発信できる

Amazon Connectからアウトバウンドの発信をするとドコモでは、非通知になりました(2018/03/08現在)。Qiitaの記事によると、番号通知は対応中のようです。

参考: Amazon Connect で非通知電話ではなく、番号通知で電話をかける方法 - Qiita

まとめ

  • Amazon ConnectでIVRを簡単に構築できる
  • アウトバウンドが非通知の表示になるので注意

みんなのマーケットでは、一緒に働いてみたいといった方をお待ちしてます。

次回は、SREエンジニアの記事です。