2021/08/12

Rubyによる負荷テストツールの実装

負荷テストツールの内部を、開発会社らしく実装しながら何度かに分けて紹介したいと思います。
HedgehogはGo言語による実装ですが、より雰囲気が掴みやすいRubyを採用してみました。

目次

  • シンプルリクエスト
  • 多重リクエスト
  • 全体の性能を測定する
  • 繰り返し制御
  • 秒間リクエスト制限
  • ユーザ変数とセッション

Rubyによる負荷テストツールの実装|JSONのシナリオ事例

シンプルリクエスト

まずは単純な実装から始めていきましょう。
シナリオファイル(JSON)を読みこんで指定のエンドポイントにリクエストするだけのプログラムです。
シナリオの定義は以下とします。

{
  "requests": [
    {
      "method": "GET",
      "url": "http://localhost:4567/"
    },
    {
      "method": "GET",
      "url": "http://localhost:4567/login"
    },
    {
      "method": "POST",
      "url": "http://localhost:4567/login",
      "form_parameters": {
        "name": "login_id",
        "password": "password"
      }
    }
  ]
}

リクエストを列挙できるのであればフォーマットは何でも良いと思います。
これを読み込んでリクエストするのが以下です。

#!/usr/bin/env ruby

require 'json'
require 'net/http'
require 'pathname'
require 'time'

class Loader
  attr_reader :scenario

  def initialize(path)
    abort "シナリオのパスを指定してください。" unless path
    scenario_path = Pathname.new(path)
    abort "パスが見つかりません: #{scenario_path}" unless scenario_path.exist?
    @scenario = JSON.parse(scenario_path.read)
  end

  def run
    info_log("試験を開始します。")
    scenario["requests"].each do |req|
      case req["method"]
      when "GET"
        get(req["url"])
      when "POST"
        post(req["url"], req["form_parameters"])
      end
    end
    info_log("試験を終了します。")
  end

  private

  def info_log(msg)
    now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{now} [LOADER] #{msg}"
  end

  def get(url)
    info_log("GET: #{url}")
    request(Net::HTTP::Get, url, nil)
  end

  def post(url, params)
    info_log("POST: #{url}")
    request(Net::HTTP::Post, url, params)
  end

  def request(method_class, url, params = nil, headers = nil)
    uri = URI.parse(url)
    http = Net::HTTP.new(uri.host, uri.port)

    if uri.scheme == 'https'
      http.use_ssl = true
    end

    req = nil
    if method_class == Net::HTTP::Get
      if params
        uri.query = URI.encode_www_form(params)
      end
      req = method_class.new(uri)
    else
      req = method_class.new(uri.path)
      if params
        req.set_form_data(params)
      end
    end

    headers&.each do |key, value|
      req[key] = value
    end

    http.request(req)
  end

end

Loader.new(ARGV.shift).run

現時点でどこまで機能追加するか決まっていないですが、できるだけ拡張しやすいようにアプリケーション本体をクラスにしています。
引数で受け取ったシナリオJSONを読み込み、指定されたエンドポイントに対してリクエストします。


実行結果
210802-loader1

いい感じですね。
ただし、これだと1ユーザのリクエストしかエミュレートできていませんので、指定したユーザ数分だけリクエストを実行するように変更してみましょう。

多重リクエスト

シナリオにユーザ数を指定する項目usersを追加します。

{
  "users": 5,
  "requests": [
    {
      "method": "GET",
      "url": "http://localhost:4567/"
    },
    {
      "method": "GET",
      "url": "http://localhost:4567/login"
    },
    {
      "method": "POST",
      "url": "http://localhost:4567/login",
      "form_parameters": {
        "name": "login_id",
        "password": "password"
      }
    }
  ]
}

あわせてプログラムを修正します。
usersの数と同数のスレッドを起動し、各スレッドでリクエストを実行します。

#!/usr/bin/env ruby

require 'benchmark'
require 'json'
require 'net/http'
require 'pathname'
require 'time'

class Loader
  attr_reader :scenario

  def initialize(path)
    abort "シナリオのパスを指定してください。" unless path
    scenario_path = Pathname.new(path)
    abort "パスが見つかりません: #{scenario_path}" unless scenario_path.exist?
    @scenario = JSON.parse(scenario_path.read)
  end

  def run
    info_log("試験を開始します。")
    threads = scenario["users"].times.map do |i|
      Thread.new(i) do |index|
        vu = VirtualUser.new(index)
        scenario["requests"].each do |req|
          case req["method"]
          when "GET"
            vu.get(req["url"])
          when "POST"
            vu.post(req["url"], req["form_parameters"])
          end
        end
      end
    end
    threads.each(&:join)
    info_log("試験を終了します。")
  end

  private

  def info_log(msg)
    now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{now} [LOADER] #{msg}"
  end

  class VirtualUser
    attr_reader :id

    def initialize(id)
      @id = id
    end

    def get(url)
      request(Net::HTTP::Get, url, nil)
    end

    def post(url, params)
      request(Net::HTTP::Post, url, params)
    end

    def request(method_class, url, params = nil, headers = nil)
      uri = URI.parse(url)
      http = Net::HTTP.new(uri.host, uri.port)

      if uri.scheme == 'https'
        http.use_ssl = true
      end

      req = nil
      method = nil
      if method_class == Net::HTTP::Get
        method = "GET"
        if params
          uri.query = URI.encode_www_form(params)
        end
        req = method_class.new(uri)
      else
        method = "POST"
        req = method_class.new(uri.path)
        if params
          req.set_form_data(params)
        end
      end

      headers&.each do |key, value|
        req[key] = value
      end

      code = nil
      t = Benchmark.realtime do
        res = http.request(req)
        code = res.code
      end
      ms = (t * 1000).round(3)
      info_log("#{method}: #{url} (#{code}, #{ms}ms)")
    end

    private

    def info_log(msg)
      now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
      puts "#{now} [LOADER][#{id}] #{msg}"
    end
  end

end

Loader.new(ARGV.shift).run

ログにどのユーザ(=スレッド)かを表示したいので、リクエスト部分はVirtualUserクラスとして定義しなおし、インデックスをユーザ識別子として渡すようにしました。
ついでに、リクエスト結果としてレスポンスコード、レスポンスタイムも表示するようにしました。


実行結果

210802-loader3

いい感じですね。複数のユーザがリクエストしている様子がわかりますね。(*1)
ただ、これだと全体としての性能がわからないので、いくつかの指標を計算・表示してみます。
*1: レスポンスタイムが激早なのは、同じホスト上にサーバを立てているからです。

全体の性能を測定する

主な負荷テストツールで必ずサポートされている以下の指標を集計してみます。
 ・秒間リクエスト数(最大、平均)
  →1秒間あたり何回のリクエストを実行したか。
 ・レスポンスタイム(最大、最小、平均)
  →リクエストを投げてからレスポンスを得るまでにかかった時間。

各スレッドの実行結果を1箇所に集めることで計算できます。
これを組み込んだのが以下になります。

#!/usr/bin/env ruby

require 'benchmark'
require 'json'
require 'net/http'
require 'pathname'
require 'time'

class Loader
  attr_reader :scenario
  attr_reader :mutex
  attr_reader :results

  def initialize(path)
    abort "シナリオのパスを指定してください。" unless path
    scenario_path = Pathname.new(path)
    abort "パスが見つかりません: #{scenario_path}" unless scenario_path.exist?
    @scenario = JSON.parse(scenario_path.read)
    @mutex = Thread::Mutex.new
    @results = []
  end

  def run
    info_log("試験を開始します。")
    threads = scenario["users"].times.map do |i|
      Thread.new(i) do |index|
        vu = VirtualUser.new(index)

        scenario["requests"].each do |req|
          r = case req["method"]
              when "GET"
                vu.get(req["url"])
              when "POST"
                vu.post(req["url"], req["form_parameters"])
              end

          # レスポンスタイム、リクエスト時間を保存
          mutex.lock
          results << r
          mutex.unlock
        end
      end
    end
    threads.each(&:join)
    analyze
    info_log("試験を終了します。")
  end

  private

  def info_log(msg)
    now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{now} [LOADER] #{msg}"
  end

  def analyze
    total_requests = 0
    total_response = 0
    rps = {}
    response_max = nil
    response_min = nil

    results.each do |r|
      rps[r[:time]] ||= 0
      rps[r[:time]] += 1

      # 最大レスポンスタイムを探す
      if !response_max || response_max < r[:response]
        response_max = r[:response]
      end

      # 最小レスポンスタイムを探す
      if !response_min || response_min > r[:response]
        response_min = r[:response]
      end

      # 平均レスポンスタイム計算用にリクエスト回数とレスポンスタイム合計を集計
      total_requests += 1
      total_response += r[:response]
    end

    response_avg = (total_response / total_requests).round(3)
    rps_max = rps.values.max
    rps_avg = (rps.values.sum / rps.size.to_f).round(2)

    info_log("")
    info_log("結果")
    info_log("---------------------------------------")
    info_log("リクエスト回数      : #{total_requests}")
    info_log("最大秒間リクエスト  : #{rps_max} req/秒")
    info_log("平均秒間リクエスト  : #{rps_avg} req/秒")
    info_log("最大レスポンスタイム: #{response_max} ms")
    info_log("最小レスポンスタイム: #{response_min} ms")
    info_log("平均レスポンスタイム: #{response_avg} ms")
    info_log("---------------------------------------")
  end

  class VirtualUser
    attr_reader :id

    def initialize(id)
      @id = id
    end

    def get(url)
      request(Net::HTTP::Get, url, nil)
    end

    def post(url, params)
      request(Net::HTTP::Post, url, params)
    end

    def request(method_class, url, params = nil, headers = nil)
      uri = URI.parse(url)
      http = Net::HTTP.new(uri.host, uri.port)

      if uri.scheme == 'https'
        http.use_ssl = true
      end

      req = nil
      method = nil
      if method_class == Net::HTTP::Get
        method = "GET"
        if params
          uri.query = URI.encode_www_form(params)
        end
        req = method_class.new(uri)
      else
        method = "POST"
        req = method_class.new(uri.path)
        if params
          req.set_form_data(params)
        end
      end

      headers&.each do |key, value|
        req[key] = value
      end

      request_time = Time.now.to_i
      code = nil
      t = Benchmark.realtime do
        res = http.request(req)
        code = res.code
      end
      ms = (t * 1000).round(3)
      info_log("#{method}: #{url} (#{code}, #{ms}ms)")

      # 集計用に結果を返す
      { time: request_time, response: ms }
    end

    private

    def info_log(msg)
      now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
      puts "#{now} [LOADER][#{id}] #{msg}"
    end
  end

end

Loader.new(ARGV.shift).run

各スレッドの結果をメインスレッドから参照できるように、インスタンス変数resultsに保存しておきます。
今回はサンプルなのでメモリ上に保存していますが、流行りのツールだとデータベース(TSDB、RDBなど)に保存するのが一般的です.
メモリ上に保存する場合は、マルチスレッドのため忘れずにロックをかけましょう。


実行結果

210802-loader3

いい感じですね。
単一サーバ・単一プロセスに加えてかなり機能が絞られているため、内部の雰囲気は掴みやすいと思います。

繰り返し制御

指定したリクエストを完了すると、すぐに試験テストが終了します。
ある程度の時間は負荷をかけ続けたいので、シナリオ全体を繰り返す機能を追加します。
繰り返しの指定は、回数または秒数にします。

{
  "users": 3,
  "loop_type": "seconds",
  "loop_value": 2,
  "requests": [
    {
      "method": "GET",
      "url": "http://localhost:4567/"
    },
    {
      "method": "GET",
      "url": "http://localhost:4567/login"
    },
    {
      "method": "POST",
      "url": "http://localhost:4567/login",
      "form_parameters": {
        "name": "login_id",
        "password": "password"
      }
    }
  ]
}

loop_typeは、count(繰り返し回数)またはseconds(繰り返し秒数)を指定します。
loop_valueは、loop_typeがcountの場合は繰り返し回数、loop_typeがsecondsの場合は繰り返す秒数とします。
対応したプログラムが以下になります。

#!/usr/bin/env ruby

require 'benchmark'
require 'json'
require 'net/http'
require 'pathname'
require 'time'

class Loader
  attr_reader :scenario
  attr_reader :mutex
  attr_reader :results

  def initialize(path)
    abort "シナリオのパスを指定してください。" unless path
    scenario_path = Pathname.new(path)
    abort "パスが見つかりません: #{scenario_path}" unless scenario_path.exist?
    @scenario = JSON.parse(scenario_path.read)
    @mutex = Thread::Mutex.new
    @results = []
  end

  def run
    info_log("試験を開始します。")
    case scenario["loop_type"]
    when "count"
      scenario["loop_value"].times do
        execute
      end
    when "seconds"
      begin
        Timeout.timeout(scenario["loop_value"]) do
          loop { execute }
        end
      rescue Timeout::Error
        info_log("#{scenario["loop_value"]}秒経過しました。")
      end
    else
      execute
    end

    analyze
    info_log("試験を終了します。")
  end

  def execute
    threads = scenario["users"].times.map do |i|
      Thread.new(i) do |index|
        vu = VirtualUser.new(index)

        scenario["requests"].each do |req|
          r = case req["method"]
              when "GET"
                vu.get(req["url"])
              when "POST"
                vu.post(req["url"], req["form_parameters"])
              end

          # レスポンスタイム、リクエスト時間を保存
          if r
            mutex.synchronize do
              results << r
            end
          end
        end
      end
    end
    threads.each(&:join)
  end

  private

  def info_log(msg)
    now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{now} [LOADER] #{msg}"
  end

  def analyze
    total_requests = 0
    total_response = 0
    rps = {}
    response_max = nil
    response_min = nil

    results.each do |r|
      rps[r[:time]] ||= 0
      rps[r[:time]] += 1

      # 最大レスポンスタイムを探す
      if !response_max || response_max < r[:response]
        response_max = r[:response]
      end

      # 最小レスポンスタイムを探す
      if !response_min || response_min > r[:response]
        response_min = r[:response]
      end

      # 平均レスポンスタイム計算用にリクエスト回数とレスポンスタイム合計を集計
      total_requests += 1
      total_response += r[:response]
    end

    response_avg = (total_response / total_requests).round(3)
    rps_max = rps.values.max
    rps_avg = (rps.values.sum / rps.size.to_f).round(2)

    info_log("")
    info_log("結果")
    info_log("---------------------------------------")
    info_log("リクエスト回数      : #{total_requests}")
    info_log("最大秒間リクエスト  : #{rps_max} req/秒")
    info_log("平均秒間リクエスト  : #{rps_avg} req/秒")
    info_log("最大レスポンスタイム: #{response_max} ms")
    info_log("最小レスポンスタイム: #{response_min} ms")
    info_log("平均レスポンスタイム: #{response_avg} ms")
    info_log("---------------------------------------")
  end

  class VirtualUser
    attr_reader :id

    def initialize(id)
      @id = id
    end

    def get(url)
      request(Net::HTTP::Get, url, nil)
    end

    def post(url, params)
      request(Net::HTTP::Post, url, params)
    end

    def request(method_class, url, params = nil, headers = nil)
      uri = URI.parse(url)
      http = Net::HTTP.new(uri.host, uri.port)

      if uri.scheme == 'https'
        http.use_ssl = true
      end

      req = nil
      method = nil
      if method_class == Net::HTTP::Get
        method = "GET"
        if params
          uri.query = URI.encode_www_form(params)
        end
        req = method_class.new(uri)
      else
        method = "POST"
        req = method_class.new(uri.path)
        if params
          req.set_form_data(params)
        end
      end

      headers&.each do |key, value|
        req[key] = value
      end

      request_time = Time.now.to_i
      code = nil
      t = Benchmark.realtime do
        res = http.request(req)
        code = res.code
      end
      ms = (t * 1000).round(3)
      info_log("#{method}: #{url} (#{code}, #{ms}ms)")

      # 集計用に結果を返す
      { time: request_time, response: ms }
    end

    private

    def info_log(msg)
      now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
      puts "#{now} [LOADER][#{id}] #{msg}"
    end
  end

end

Loader.new(ARGV.shift).run

回数指定の繰り返しはシナリオ実行箇所をループするだけで、秒数指定の場合は無限ループ内で指定秒数に達したら例外をキャッチして中断します。


実行結果(秒数指定)

210802-loader4-1

210802-loader4-2

2秒だと指定が長過ぎるので途中を省略しましたが、2秒で中断しています。

秒間リクエスト制限

指定のユーザ数で大量のリクエストを発生させることはできましたが、リクエスト間隔がベストエフォートとなっています。
そこで、指定した秒間リクエストは超えないように制限を入れてみます。

{
  "users": 3,
  "limit": 10,
  "loop_type": "seconds",
  "loop_value": 2,
  "requests": [
    {
      "method": "GET",
      "url": "http://localhost:4567/"
    },
    {
      "method": "GET",
      "url": "http://localhost:4567/login"
    },
    {
      "method": "POST",
      "url": "http://localhost:4567/login",
      "form_parameters": {
        "name": "login_id",
        "password": "password"
      }
    }
  ]
}

limitに指定した数値を秒間リクエスト上限として扱います。

制限を入れるために、https://github.com/Shopify/limiterを利用します。
汎用的なスロットリング機構を提供していてコードもコンパクトで使いやすいと思います。
ruby-limiter という名前でrubygemsに登録されているのでgem install ruby-limiterでインストールしましょう。
利用したものが以下になります。

#!/usr/bin/env ruby

require 'benchmark'
require 'json'
require 'net/http'
require 'pathname'
require 'ruby-limiter'
require 'time'

class Loader
  attr_reader :scenario
  attr_reader :mutex
  attr_reader :results

  def initialize(path)
    abort "シナリオのパスを指定してください。" unless path
    scenario_path = Pathname.new(path)
    abort "パスが見つかりません: #{scenario_path}" unless scenario_path.exist?
    @scenario = JSON.parse(scenario_path.read)
    @limiter = @scenario["limit"] ? Limiter::RateQueue.new(@scenario["limit"], interval: 1) : nil
    @mutex = Thread::Mutex.new
    @results = []
  end

  def run
    info_log("試験を開始します。")
    case scenario["loop_type"]
    when "count"
      scenario["loop_value"].times do
        execute
      end
    when "seconds"
      begin
        Timeout.timeout(scenario["loop_value"]) do
          loop { execute }
        end
      rescue Timeout::Error
        info_log("#{scenario["loop_value"]}秒経過しました。")
      end
    else
      execute
    end

    analyze
    info_log("試験を終了します。")
  end

  def execute
    threads = scenario["users"].times.map do |i|
      Thread.new(i) do |index|
        vu = VirtualUser.new(index, limiter: @limiter)

        scenario["requests"].each do |req|
          r = case req["method"]
              when "GET"
                vu.get(req["url"])
              when "POST"
                vu.post(req["url"], req["form_parameters"])
              end

          # レスポンスタイム、リクエスト時間を保存
          if r
            mutex.synchronize do
              results << r
            end
          end
        end
      end
    end
    threads.each(&:join)
  end

  private

  def info_log(msg)
    now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{now} [LOADER] #{msg}"
  end

  def analyze
    total_requests = 0
    total_response = 0
    rps = {}
    response_max = nil
    response_min = nil

    results.each do |r|
      rps[r[:time]] ||= 0
      rps[r[:time]] += 1

      # 最大レスポンスタイムを探す
      if !response_max || response_max < r[:response]
        response_max = r[:response]
      end

      # 最小レスポンスタイムを探す
      if !response_min || response_min > r[:response]
        response_min = r[:response]
      end

      # 平均レスポンスタイム計算用にリクエスト回数とレスポンスタイム合計を集計
      total_requests += 1
      total_response += r[:response]
    end

    response_avg = (total_response / total_requests).round(3)
    rps_max = rps.values.max
    rps_avg = (rps.values.sum / rps.size.to_f).round(2)

    info_log("")
    info_log("結果")
    info_log("---------------------------------------")
    info_log("リクエスト回数      : #{total_requests}")
    info_log("最大秒間リクエスト  : #{rps_max} req/秒")
    info_log("平均秒間リクエスト  : #{rps_avg} req/秒")
    info_log("最大レスポンスタイム: #{response_max} ms")
    info_log("最小レスポンスタイム: #{response_min} ms")
    info_log("平均レスポンスタイム: #{response_avg} ms")
    info_log("---------------------------------------")
  end

  class VirtualUser
    attr_reader :id
    attr_reader :limiter

    def initialize(id, options = {})
      @id = id
      @limiter = options[:limiter]
    end

    def get(url)
      request(Net::HTTP::Get, url, nil)
    end

    def post(url, params)
      request(Net::HTTP::Post, url, params)
    end

    def request(method_class, url, params = nil, headers = nil)
      @limiter.shift if @limiter

      uri = URI.parse(url)
      http = Net::HTTP.new(uri.host, uri.port)

      if uri.scheme == 'https'
        http.use_ssl = true
      end

      req = nil
      method = nil
      if method_class == Net::HTTP::Get
        method = "GET"
        if params
          uri.query = URI.encode_www_form(params)
        end
        req = method_class.new(uri)
      else
        method = "POST"
        req = method_class.new(uri.path)
        if params
          req.set_form_data(params)
        end
      end

      headers&.each do |key, value|
        req[key] = value
      end

      request_time = Time.now.to_i
      code = nil
      t = Benchmark.realtime do
        res = http.request(req)
        code = res.code
      end
      ms = (t * 1000).round(3)
      info_log("#{method}: #{url} (#{code}, #{ms}ms)")

      # 集計用に結果を返す
      { time: request_time, response: ms }
    end

    private

    def info_log(msg)
      now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
      puts "#{now} [LOADER][#{id}] #{msg}"
    end
  end

end

Loader.new(ARGV.shift).run

<実行結果>

210802-loader6

指定した秒間リクエストの10req/秒の範囲内でリクエストが実行されていることを確認できます。

ユーザ変数とセッション

ログインなど、ユーザによって異なるパラメータを渡したい場合がありますので対応します。
また、クッキーによるセッションとするため、クッキーの保存・送信にも対応します。

{
  "users": 3,
  "interval": 1000,
  "limit": 10,
  "loop_type": "seconds",
  "loop_value": 2,
  "requests": [
    {
      "method": "GET",
      "url": "http://localhost:4567/"
    },
    {
      "method": "GET",
      "url": "http://localhost:4567/login"
    },
    {
      "method": "POST",
      "url": "http://localhost:4567/login",
      "form_parameters": {
        "login_id": "${login_id}",
        "password": "${password}"
      }
    },
    {
      "method": "GET",
      "url": "http://localhost:4567/profile",
      "verbose": true
    }
  ],
  "variables": [
    { "login_id": "user1", "password": "password1" },
    { "login_id": "user2", "password": "password2" },
    { "login_id": "user3", "password": "password3" },
    { "login_id": "user4", "password": "password4" },
    { "login_id": "user5", "password": "password5" }
  ]
}

variablesにハッシュの配列を追加しました。先頭から順番にユーザに割り当てるイメージです。
割り当てた変数は${login_id}, ${password}どでアクセスできるように実装します。

#!/usr/bin/env ruby

require 'benchmark'
require 'json'
require 'net/http'
require 'pathname'
require 'ruby-limiter'
require 'time'

class Loader
  attr_reader :scenario
  attr_reader :mutex
  attr_reader :results

  def initialize(path)
    abort "シナリオのパスを指定してください。" unless path
    scenario_path = Pathname.new(path)
    abort "パスが見つかりません: #{scenario_path}" unless scenario_path.exist?
    @scenario = JSON.parse(scenario_path.read)
    @limiter = @scenario["limit"] ? Limiter::RateQueue.new(@scenario["limit"], interval: 1) : nil
    @mutex = Thread::Mutex.new
    @results = []
  end

  def run
    info_log("試験を開始します。")
    case scenario["loop_type"]
    when "count"
      scenario["loop_value"].times do
        execute
      end
    when "seconds"
      begin
        Timeout.timeout(scenario["loop_value"]) do
          loop { execute }
        end
      rescue Timeout::Error
        info_log("#{scenario["loop_value"]}秒経過しました。")
      end
    else
      execute
    end

    analyze
    info_log("試験を終了します。")
  end

  def execute
    threads = scenario["users"].times.map do |i|
      Thread.new(i) do |index|
        vars = scenario["variables"] ? scenario["variables"][index] : nil
        vu = VirtualUser.new(index, limiter: @limiter, variables: vars)

        scenario["requests"].each do |req|
          r = case req["method"]
              when "GET"
                vu.get(req["url"], { verbose: req["verbose"] })
              when "POST"
                vu.post(req["url"], req["form_parameters"], { verbose: req["verbose"] })
              end

          # レスポンスタイム、リクエスト時間を保存
          if r
            mutex.synchronize do
              results << r
            end
          end
        end
      end
    end
    threads.each(&:join)
  end

  private

  def info_log(msg)
    now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{now} [LOADER] #{msg}"
  end

  def analyze
    total_requests = 0
    total_response = 0
    rps = {}
    response_max = nil
    response_min = nil

    results.each do |r|
      rps[r[:time]] ||= 0
      rps[r[:time]] += 1

      # 最大レスポンスタイムを探す
      if !response_max || response_max < r[:response]
        response_max = r[:response]
      end

      # 最小レスポンスタイムを探す
      if !response_min || response_min > r[:response]
        response_min = r[:response]
      end

      # 平均レスポンスタイム計算用にリクエスト回数とレスポンスタイム合計を集計
      total_requests += 1
      total_response += r[:response]
    end

    response_avg = (total_response / total_requests).round(3)
    rps_max = rps.values.max
    rps_avg = (rps.values.sum / rps.size.to_f).round(2)

    info_log("")
    info_log("結果")
    info_log("---------------------------------------")
    info_log("リクエスト回数      : #{total_requests}")
    info_log("最大秒間リクエスト  : #{rps_max} req/秒")
    info_log("平均秒間リクエスト  : #{rps_avg} req/秒")
    info_log("最大レスポンスタイム: #{response_max} ms")
    info_log("最小レスポンスタイム: #{response_min} ms")
    info_log("平均レスポンスタイム: #{response_avg} ms")
    info_log("---------------------------------------")
  end

  class VirtualUser
    attr_reader :id
    attr_reader :limiter
    attr_reader :variables

    def initialize(id, options = {})
      @id = id
      @limiter = options[:limiter]
      @variables = options[:variables]
      @cookie = {}
    end

    def get(url, options = {})
      request("GET", url, nil, nil, options)
    end

    def post(url, params = {}, options = {})
      request("POST", url, resolve_variables(params), nil, options)
    end

    def request(method, url, params = nil, headers = nil, options = {})
      @limiter.shift if @limiter

      uri = URI.parse(url)
      http = create_http(uri)
      req = build_request(method, uri, params, headers)

      request_time = Time.now.to_i
      res = nil
      t = Benchmark.realtime do
        res = http.request(req)
      end
      ms = (t * 1000).round(3)
      info_log("#{method}: #{url} (#{res.code}, #{ms}ms)")
      info_log(res.body) if options[:verbose]

      if res.key?("Set-Cookie")
        res.get_fields("Set-Cookie").each do |field|
          key, value = field[0..field.index(";")].split("=")
          @cookie[key] = value
        end
      end

      # 集計用に結果を返す
      { time: request_time, response: ms }
    end

    private

    def info_log(msg)
      now = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L")
      puts "#{now} [LOADER][#{id}] #{msg}"
    end

    def resolve_variables(original = {})
      params = original.clone
      if variables && variables.keys.length > 0
        params.each do |k, v|
          if v =~ /\A\$\{(\w+)}\Z/
            params[k] = variables[$1]
          end
        end
      end
      params
    end

    def create_http(uri)
      http = Net::HTTP.new(uri.host, uri.port)
      if uri.scheme == 'https'
        http.use_ssl = true
      end
      http
    end

    def build_request(method, uri, params = nil, headers = nil)
      req = create_request(method, uri, params)
      req.add_field("Cookie", cookie_string)

      headers&.each do |key, value|
        req[key] = value
      end
      req
    end

    def create_request(method, uri, params = nil)
      case method
      when /get/i
        uri.query = URI.encode_www_form(params) if params
        Net::HTTP::Get.new(uri)
      when /post/i
        r = Net::HTTP::Post.new(uri.path)
        r.set_form_data(params) if params
        r
      else
        abort "unknown method: #{method}"
      end
    end

    def cookie_string
      @cookie.map { |key, value| "#{key}=#{value}"}.join(";")
    end
  end

end

Loader.new(ARGV.shift).run

実際にはvariables部分をユーザ数分用意するのは大変すぎるので、別途CSVファイルなどから読み出して割当てるような実装が現実的だと思います。


<実行結果>

210802-loader5

認証したユーザに対応した情報が返ってくることを確認できます。

まとめ

負荷試験テストの主要な機能を実装してみました。
今回は設定ファイル(JSON)ベースのシナリオのため、シンプルな実装になったと思います。
反面、自由度はそこまでありませんので、いずれk6のようなJavaScriptベースのシナリオを使った実装についても紹介したいと思います。


一覧へ