Elixir 学习笔记 003

Intermediate

Mix 是 Elixir 社区开发的集包管理、依赖管理、构建工具于一身的开发工具,扩展性极好,功能强大,自带对 Erlang 的支持,可以类比 Golang 自带的 go,详细的使用方式请参考 mix help 以及 mix

我们如果需要创建一个新项目,使用 mix new 命令即可,详细使用方法可以使用 mix help new 查看,对于新建项目,mix 会很友好的创建一系列文件 (其中还包含 .gitignore)

1
mix new example

我们目前只需要关注其中的 mix.exs 就行了,它包含了配置应用、依赖、环境信息、版本等功能,project 函数设置项目相关信息, application 函数在生产应用文件的时候会用到,deps 函数则是定义项目的依赖项

我们需要把所需的依赖全部列入 deps 中,deps 返回一个列表,每一项依赖都写在元组中,格式如下

1
2
3
{app, requirement}
{app, opts}
{app, requirement, opts}
  • app 是一个原子,是依赖项的名称
  • requirement 是一个字符串或正则表达式,用以设定版本
  • opts 是一个 keyword list,设置依赖相关操作

下面列出常用的添加依赖方式

1
2
3
4
5
6
{:plug, ">= 0.4.0"}, # 从 hex.pm 安装版本大于等于 0.4.0 的依赖
{:gettext, git: "https://github.com/elixir-lang/gettext.git", tag: "0.1"}, # 从指定git仓库下载依赖
{:local_dep, path: "/path/to/local/deps"}, # 本地依赖项
{:telemetry, "~> 0.4"}, # 从 hex.pm 安装版本 0.4 的依赖项
{:phoenix_view, github: "phoenixframework/phoenix_view", branch: "master"}, # 从 github 下载依赖 master 分支
{:cowboy, "~> 1.0", only: [:dev, :test]}, # 安装依赖,并只在 dev 与 tst 环境启用

当依赖项写好之后,我们只需要执行命令获取依赖就行

1
mix deps.get

至于环境,mix 默认支持三种

  • :dev 开发环境,默认的环境
  • :test 测试环境,使用 mix test
  • :prod 生产环境,把应用上线到生产环境下的配置项

可以写一个 mix.env 文件,来让 mix 从中获取环境,也可以在命令行中使用 MIX_ENV 来配置环境

1
MIX_ENV=prod mix compile

Elixir 社区提供了相当多的工具,其中之前讲过了文档使用的 ExDoc,今天我们主要说一说测试工具 ExUnit

测试是通过 Elixir 脚本来执行的,所以测试文件的后缀必须是 .exs,在测试之前需要使用 ExUnit.start() 启动 ExUnit (一般 mix 中的 test/test_helper.exs 已经帮我们做了这一步)。当执行 mix test 时,就开始运行项目测试了,测试时除了 case 之外,还会执行文档测试

1
2
3
4
5
6
7
8
# test/example_test.exs
defmodule ExampleTest do
  use ExUnit.Case
  doctest Example
  test "greets the world" do
    assert Example.hello() == :world
  end
end
  • 断言 (assert) 一般用于测试中检查表达式的值是否为真,表达式为假是则会抛出异常,测试失败,下面我们让表达式为假看看测试结果
1
2
3
4
5
6
7
  1) test greets the world (ExampleTest)
     test/example_test.exs:4
     Assertion with != failed, both sides are exactly equal
     code: assert Example.hello() != :world
     left: :world
     stacktrace:
       test/example_test.exs:5: (test)
  • refute 与 assert 的关系就像 unless 与 if,所以它们正好是一对语义相反的断言
  • assert_raise 是错误处理中断言某个错误是否被抛出,而 assert_receive 则是并发当中断言小时是否被发送
  • capture_io 和 capture_log 都是检查应用是否正确输出,不过 _io 检查的是IO输出,_log 检查的是logger输出

Elixir 构建于 BEAM (Erlang VM) 之上,可以直接使用 Erlang 大量的库。Erlang 的模块用小写的原子变量表示,比如 :os:timer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 使用 Erlang 的模块计算函数运行时间
defmodule Example do
  def time(fun, args) do
    {time, result} = :timer.tc(fun, args)
    IO.puts("Time: #{time} μs\nResult: #{result}")
  end
end
Example.time(&(&1 * &1 * &1), [100])
# Time: 9 μs
# Result: 1000000

至于 Erlang 的第三方模块,我们使用 mix 管理,如果模块不在 hex 中,我们可以将依赖的 git 仓库添加进来,之后就可以愉快的使用这些模块了

需要注意的是,有一些小坑需要注意,比如字符串, Elixir 的字符串是 UTF-8 编码的二进制数据,而 Erlang 的是字符列表

1
2
3
4
is_list('Example') # true
is_list("Example") # false
is_binary("Example") # true
<<"Example">> === "Example" # true
1
2
3
4
is_list('Example'). %% false
is_list("Example"). %% true
is_binary("Example"). %% false
is_binary(<<"Example">>). %% true

Elixir 通常会返回 {:ok, result}{:error, reason} 来表示错误,或者抛出异常, Elixir 社区在返回错误方面有一些约定

  • 对于那些作为一个函数功能相关的错误,这个函数应当相应地返回元组表示错误,如用户输入了一个错误的日期类型值
  • 对于那些和函数功能无关的错误,则需要抛出异常,如无法正确解析配置的参数

通常一些公开的 API 中会有一个带感叹号的版本,这些函数返回一个未封包的结果,或者抛出异常

在学习如何处理异常之前,我们应该学习如何产生一个错误,最简单的方式就是 raise ,它接收错误消息,并产生一个错误;对于产生的错误,我们使用 try/rescue 与模式匹配来处理

1
2
3
4
5
6
7
8
try do
  if elixir, do: raise "Oh NO!!!" # (RuntimeError) Oh NO!!!
  # (ArgumentError) the argument value is invalid
  raise ArgumentError, message: "the argument value is invalid"
rescue
  e in RuntimeError -> "A Runtime Error occurred: #{e.message}"
  e in ArgumentError -> "An Argument Error occurred: #{e.message}"
end

对于无论是否产生错误,都需要在 try/rescue 之后进行的操作,我们使用 after 来执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
try do
  if elixir, do: raise "Oh NO!!!"
rescue
  e in RuntimeError -> "A Runtime Error occurred: #{e.message}"
after
  IO.puts("The End!!!")
end
# when elixir==true:
#     The End!!!
#     "A Runtime Error occurred: Oh NO!!!"

这种情况比较常见的用法是关闭文件、连接等(突然怀念 RAII

感觉遇到了老熟人, throwtry/catch ,这种错误处理可以直接抛出一个值,并从当前执行的流程中退出,catch 可以直接使用这个抛出的值,不过在 Elixir 新代码中用的很少了

1
2
3
4
5
6
7
8
try do
  for x <- 1..10 do
    if rem(x, 3) == 0, do: throw x
  end
catch
  x -> "Caught: #{x}"
end
# "Caught: 3"

Exiting 是 Elixir 提供的最后一种产生错误的方式,产生退出信号直接挂掉,这是 Elixir 容错机制的一部分

1
2
3
4
5
6
fn -> exit "Oops" end.() # (exit) "Oops"
try do
  exit "Oops"
catch
  :exit, _ -> "EXIT!!!"
end

虽然 exit 可以被捕获,但是请不要这样做,把它交给 supervisor 去处理

Elixir 提供了相当的内建错误类型,不过 Elixir 还是提供了自建错误类型的方法,使用 defexception/1 来创建新的错误类型,并通过 :message 来设置默认的错误消息

1
2
3
4
5
6
7
8
9
defmodule ExampleError do
  defexception message: "an example error"
end
try do
  raise ExampleError
rescue
  e in ExampleError -> e.message
end
# "an example error"

得益于 BEAM (Erlang VM),Elixir 对并发的支持很棒,并发模型是 Actors,通过消息传递交互的进程,BEAM 的进程是轻量级的,可以运行在所有 CPU 之上,类似于现在所说的协程。

创建新进程的方式很简单,和 Golang 中的 go 有异曲同工之妙,使用 spawn 即可完成,并且返回一个 pid (进程标识符)

1
2
3
4
5
defmodule Example do
  def add(a, b), do: a + b
end
Example.add(2, 3) # 5
spawn(Example, :add, [2, 3]) # #PID<0.124.0>

BEAM 中的进程间方式仅有消息传递, send 允许我们向 PID 发送消息,而 receive 监听和匹配消息,如果没有匹配的消息,进程会被阻塞。如果你用过 Golang,那你一定熟悉它,因为这和 Golang 中的 chan 很像,但是消息传递中发送方不会被阻塞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
defmodule Example do
  def listen do
    receive do
      {:ok, "hello"} -> IO.puts("world")
    end
    IO.puts("receive end")
    listen()
  end
end
pid = spawn(Example, :listen, [])
send(pid, {:ok, "hello"})
# world
# receive end
send(pid, :ok)
# :ok

如果进程崩溃了,spawn 就会有问题,因为父进程不会知道子进程出错而导致程序异常,为了解决这个问题,我们需要将父子进程连接起来,这样它们可以收到相互退出的通知

1
2
3
4
5
defmodule Example do
  def explode, do: exit :iris
end
spawn(Example, :explode, []) # #PID<0.150.0>
spawn_link(Example, :explode, []) # (EXIT from #PID<0.107.0>) shell process exited with reason: :iris

有时候我们不希望链接的进程导致当前进程跟着崩溃,这时候就要通过 Process.flag/2 函数捕捉进程的错误退出,这个函数用 Erlang 的 process_flag/2 的 trap_exit 信号。当捕获到被链接的进程发生错误退出时 (trap_exit 设为 true), 就会收到像 {:EXIT, from_pid, reason} 这样的三元组形式的退出信号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
defmodule Example do
  def explode, do: exit :iris
  def run do
    Process.flag(:trap_exit, true)
    spawn_link(Example, :explode, [])
    receive do
      {:EXIT, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
Example.run() # Exit reason: iris

如果不希望链接两个进程,但是仍然希望获得错误信息通知,那么就需要监控这个进程,即 spawn_monitor,不需要捕获进程,也不会导致当前进程崩溃

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
defmodule Example do
  def explode, do: exit :iris
  def run do
    spawn_monitor(Example, :explode, [])
    receive do
      {:DOWN, _ref, :process, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
Example.run() # Exit reason: iris

Agent 是 同步的 Promise/Future 的抽象,函数的返回值就是 Agent 的状态,我们可以通过 PID 来获取它,当然也可以通过命名来获取

1
2
3
4
5
{:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end) # {:ok, #PID<0.109.0>}
Agent.update(agent, fn (state) -> state ++ [4, 5] end)
Agent.get(agent, &(&1)) # [1, 2, 3, 4, 5]
{:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end, name: Numbers)
Agent.get(Numbers, &(&1)) # [1, 2, 3]

Task 是 异步的 Promise/Future 抽象,与 Agent 类似,但是提供了更多的异步操作

1
2
3
4
5
6
7
8
9
defmodule Example do
  def add(a, b) do
    :timer.sleep(3000) # sleep 3 s
    a + b
  end
end
task = Task.async(Example, :add, [5, 6]) # 异步执行 Example.add
:timer.sleep(2000)
Task.await(task) # 获取 Example.add 的运行结果

OTP server 包含了 GenServer 的主要行为和一系列 callbacks,GenServer 是一个专门监控和控制进程状态、启停等的抽象,属于 BEAM/OTP 的一部分,它是一个循环,每次迭代都会处理一个带有目标状态的请求

我们先学习最简单的 GenServer 的使用,即启动与初始化,我们使用 GenServer 简单的实现一个队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
defmodule SimpleQueue do
  use GenServer
  @doc """
  Start our queue and link it. This is a helper function
  """
  def start_link(state \\ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  @doc """
  GenServer.init/1 callback
  """
  def init(state), do: {:ok, state}
end

我们如果需要使用 GenServer 同步调用,则需要实现 GenServer.handle_call/3 函数,接受请求、调用者PID以及初始状态,期望的返回值为 {:reply, response, state}

1
2
3
4
5
6
7
8
9
@doc """
GenServer.handle_call/3 callback
"""
def handle_call(:dequeue, _from, [value | state]), do: {:reply, value, state}
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}

def queue(), do: GenServer.call(__MODULE__, :queue)
def dequeue(), do: GenServer.call(__MODULE__, :dequeue)

同步操作时,当调用 :dequeue 函数,就会从队列中取出头部,并将尾部保存为状态等待下一次使用,最终返回头部,当队列为空时则什么都不做。:queue 函数则只会展示当前状态,不会改变状态

1
2
3
4
5
6
SimpleQueue.start_link([1, 2, 3]) # {:ok, #PID<0.136.0>}
SimpleQueue.dequeue() # 1
SimpleQueue.queue() # [2, 3]
SimpleQueue.dequeue() # 2
SimpleQueue.dequeue() # 3
SimpleQueue.queue() # []

如果想实现异步操作,那么需要实现类似的 handle_cast/2 函数,它不接受调用者为参数,且没有返回值,剩下的与同步调用时几乎一致

1
2
3
4
5
6
@doc """
GenServer.handle_cast/2 callback
"""
def handle_cast({:enqueue, value}, state), do: {:noreply, state ++ [value]}

def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})

Supervisors 是一种特殊的进程,它专门监控其他进程,并自动重启出错的子进程,从而实现容错性高的程序。Supervisors 的魔力主要在 Supervisor.start_link/2 函数,这个函数除了能启动 supervisor 和子进程之外,它还允许我们设置管理子进程的策略

使用 mix new simple_queue --sup 命令,我们创建了拥有 supervisor 树的新项目, SimpleQueue 的代码放在 lib/simple_queue.ex,supervisor 的代码我们将添加到 lib/simple_queue/application.ex 中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
defmodule SimpleQueue.Application do
  use Application
  def start(_type, _args) do
    children = [SimpleQueue]
    # 如果有配置项,可以使用元组来配置 children
    # children = [{SimpleQueue, [1, 2, 3]}]
    opts = [strategy: :one_for_one, name: SimpleQueue.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

如果我们的 SimpleQueue 进程崩溃了,或者被终止了,Supervisor 会自动重启这个进程,重启策略有三种

  • :one_for_one:只重启失败的子进程
  • :one_for_all:当错误事件出现时,重启所有子进程
  • :rest_for_one:重启失败的子进程,以及所有在它后面启动的进程

当 Supervisor 进程启动后,它必须知道如何操作子进程,所以每个子模块都应该有 child_spec/1 函数来定义操作行为,不过幸运的是,如果使用了 use GenServer / use Supervisoruse Agent 会自动帮我们定义好这些行为,如果需要自己定义的话

1
2
3
4
5
6
7
8
9
def child_spec(opts) do
  %{
    id: SimpleQueue,
    start: {__MODULE__, :start_link, [opts]},
    shutdown: 5_000,
    restart: :permanent,
    type: :worker,
  }
end

我们下来来说说这些参数都是什么

  • id:Supervisor 用于定位子进程的 specification,必选
  • start:被 Supervisor 启动时,需要调用的 Module / Function / Arguments,必选
  • shutdown:子进程关闭时的行为,可选
    • :brutal_kill 子进程立即停止
    • :infinity Supervisor 将会无限期等待,这是 :supervisor 进程类型的默认值
    • 任意正整数,以 ms 为单位的等待时间,超时后将杀掉子进程, :work 进程类型的默认值为 5000
  • restart:子进程崩溃时的处理方式,可选
    • :permanent 总是重启子进程,这是默认值
    • :temporary 绝不重启子进程
    • :transient 只有在非正常中止的时候才重启子进程
  • type:进程的类型 :worker:supervisor ,默认 :worker,可选

Supervisor 通常在应用启动时伴随子进程启动,但有时候被监管的子进程在应用启动时还是 未知的 (如 web 应用中启动了一个新进程处理用户连接),我们需要一个能按需启动子进程的 Supervisor,这正是 DynamicSupervisor 的使用场景

我们不指定子进程,我们只要定义好运行时的选项即可,不过 DynamicSupervisor 只支持 :one_for_one 这一种监管策略

1
2
options = [strategy: :one_for_one, name: SimpleQueue.Supervisor]
DynamicSupervisor.start_link(options)

我们需要使用 start_child/2 函数来动态启动新的 SimpleQueue 子进程,这个函数接收一个 supervisor 和子进程 specification 作为参数 (SimpleQueue 使用了 use GenServer,所以子进程的 specification 已经定义好了)

1
{:ok, pid} = DynamicSupervisor.start_child(SimpleQueue.Supervisor, SimpleQueue)

Task 有自己特殊的 Supervisor,它是专门为动态创建的任务而设计的 supervisor,内部实际使用的是 DynamicSupervisor

Task.Supervisor 与其他 Supervisor 在使用上没有什么区别,与 Supervisor 主要的区别时默认重启策略的不同,Task.Supervisor 默认重启策略为 :temporary

1
2
3
4
children = [
  {Task.Supervisor, name: ExampleApp.TaskSupervisor, restart: :transient}
]
{:ok, pid} = Supervisor.start_link(children, [strategy: :one_for_one])

当创建好 Task.Supervisor 后,我们可以使用 start_child/2 来创建受监管的 task。如果我们的任务过早地崩溃掉,它会被自动启动。这个功能在处理大量涌来的请求或者后台工作的时候非常有用

1
{:ok, pid} = Task.Supervisor.start_child(ExampleApp.TaskSupervisor, fn -> background_work() end)