目录

Elixir Intermediate

系列 - Elixir 学习笔记

Mix 是 Elixir 社区开发的集包管理、依赖管理、构建工具于一身的开发工具,扩展性极好,功能强大,自带对 Erlang 的支持,可以类比 Golang 自带的 go,详细的使用方式请参考 mix help 以及 mix

我们如果需要创建一个新项目,使用 mix new 命令即可,详细使用方法可以使用 mix help new 查看,对于新建项目,mix 会很友好的创建一系列文件 (其中还包含 .gitignore)

shell

mix new example

我们目前只需要关注其中的 mix.exs 就行了,它包含了配置应用、依赖、环境信息、版本等功能,project 函数设置项目相关信息, application 函数在生产应用文件的时候会用到,deps 函数则是定义项目的依赖项

我们需要把所需的依赖全部列入 deps 中,deps 返回一个列表,每一项依赖都写在元组中,格式如下

elixir

{app, requirement}
{app, opts}
{app, requirement, opts}
  • app 是一个原子,是依赖项的名称
  • requirement 是一个字符串或正则表达式,用以设定版本
  • opts 是一个 keyword list,设置依赖相关操作

下面列出常用的添加依赖方式

elixir

{:plug, ">= 0.4.0"}, # 从 hex.pm 安装版本大于等于 0.4.0 的依赖
{:gettext, git: "https://github.com/elixir-lang/gettext.git", tag: "0.1"}, # 从指定git仓库下载依赖
{:local_dep, path: "/path/to/local/deps"}, # 本地依赖项
{:telemetry, "~> 0.4"}, # 从 hex.pm 安装版本 0.4 的依赖项
{:phoenix_view, github: "phoenixframework/phoenix_view", branch: "master"}, # 从 github 下载依赖 master 分支
{:cowboy, "~> 1.0", only: [:dev, :test]}, # 安装依赖,并只在 dev 与 tst 环境启用

当依赖项写好之后,我们只需要执行命令获取依赖就行

shell

mix deps.get

至于环境,mix 默认支持三种

  • :dev 开发环境,默认的环境
  • :test 测试环境,使用 mix test
  • :prod 生产环境,把应用上线到生产环境下的配置项

可以写一个 mix.env 文件,来让 mix 从中获取环境,也可以在命令行中使用 MIX_ENV 来配置环境

shell

MIX_ENV=prod mix compile

Elixir 社区提供了相当多的工具,其中之前讲过了文档使用的 ExDoc,今天我们主要说一说测试工具 ExUnit

测试是通过 Elixir 脚本来执行的,所以测试文件的后缀必须是 .exs,在测试之前需要使用 ExUnit.start() 启动 ExUnit (一般 mix 中的 test/test_helper.exs 已经帮我们做了这一步)。当执行 mix test 时,就开始运行项目测试了,测试时除了 case 之外,还会执行文档测试

elixir

# test/example_test.exs
defmodule ExampleTest do
  use ExUnit.Case
  doctest Example
  test "greets the world" do
    assert Example.hello() == :world
  end
end
  • 断言 (assert) 一般用于测试中检查表达式的值是否为真,表达式为假是则会抛出异常,测试失败,下面我们让表达式为假看看测试结果

text

1) test greets the world (ExampleTest)
   test/example_test.exs:4
   Assertion with != failed, both sides are exactly equal
   code: assert Example.hello() != :world
   left: :world
   stacktrace:
     test/example_test.exs:5: (test)
  • refute 与 assert 的关系就像 unless 与 if,所以它们正好是一对语义相反的断言
  • assert_raise 是错误处理中断言某个错误是否被抛出,而 assert_receive 则是并发当中断言小时是否被发送
  • capture_io 和 capture_log 都是检查应用是否正确输出,不过 _io 检查的是IO输出,_log 检查的是logger输出

Elixir 构建于 BEAM (Erlang VM) 之上,可以直接使用 Erlang 大量的库。Erlang 的模块用小写的原子变量表示,比如 :os:timer

elixir

# 使用 Erlang 的模块计算函数运行时间
defmodule Example do
  def time(fun, args) do
    {time, result} = :timer.tc(fun, args)
    IO.puts("Time: #{time} μs\nResult: #{result}")
  end
end
Example.time(&(&1 * &1 * &1), [100])
# Time: 9 μs
# Result: 1000000

至于 Erlang 的第三方模块,我们使用 mix 管理,如果模块不在 hex 中,我们可以将依赖的 git 仓库添加进来,之后就可以愉快的使用这些模块了

需要注意的是,有一些小坑需要注意,比如字符串, Elixir 的字符串是 UTF-8 编码的二进制数据,而 Erlang 的是字符列表

elixir

is_list('Example') # true
is_list("Example") # false
is_binary("Example") # true
<<"Example">> === "Example" # true

erlang

is_list('Example'). %% false
is_list("Example"). %% true
is_binary("Example"). %% false
is_binary(<<"Example">>). %% true

Elixir 通常会返回 {:ok, result}{:error, reason} 来表示错误,或者抛出异常, Elixir 社区在返回错误方面有一些约定

  • 对于那些作为一个函数功能相关的错误,这个函数应当相应地返回元组表示错误,如用户输入了一个错误的日期类型值
  • 对于那些和函数功能无关的错误,则需要抛出异常,如无法正确解析配置的参数

通常一些公开的 API 中会有一个带感叹号的版本,这些函数返回一个未封包的结果,或者抛出异常

在学习如何处理异常之前,我们应该学习如何产生一个错误,最简单的方式就是 raise ,它接收错误消息,并产生一个错误;对于产生的错误,我们使用 try/rescue 与模式匹配来处理

elixir

try do
  if elixir, do: raise "Oh NO!!!" # (RuntimeError) Oh NO!!!
  # (ArgumentError) the argument value is invalid
  raise ArgumentError, message: "the argument value is invalid"
rescue
  e in RuntimeError -> "A Runtime Error occurred: #{e.message}"
  e in ArgumentError -> "An Argument Error occurred: #{e.message}"
end

对于无论是否产生错误,都需要在 try/rescue 之后进行的操作,我们使用 after 来执行

elixir

try do
  if elixir, do: raise "Oh NO!!!"
rescue
  e in RuntimeError -> "A Runtime Error occurred: #{e.message}"
after
  IO.puts("The End!!!")
end
# when elixir==true:
#     The End!!!
#     "A Runtime Error occurred: Oh NO!!!"

这种情况比较常见的用法是关闭文件、连接等(突然怀念 RAII

感觉遇到了老熟人, throwtry/catch ,这种错误处理可以直接抛出一个值,并从当前执行的流程中退出,catch 可以直接使用这个抛出的值,不过在 Elixir 新代码中用的很少了

elixir

try do
  for x <- 1..10 do
    if rem(x, 3) == 0, do: throw x
  end
catch
  x -> "Caught: #{x}"
end
# "Caught: 3"

Exiting 是 Elixir 提供的最后一种产生错误的方式,产生退出信号直接挂掉,这是 Elixir 容错机制的一部分

elixir

fn -> exit "Oops" end.() # (exit) "Oops"
try do
  exit "Oops"
catch
  :exit, _ -> "EXIT!!!"
end

虽然 exit 可以被捕获,但是请不要这样做,把它交给 supervisor 去处理

Elixir 提供了相当的内建错误类型,不过 Elixir 还是提供了自建错误类型的方法,使用 defexception/1 来创建新的错误类型,并通过 :message 来设置默认的错误消息

elixir

defmodule ExampleError do
  defexception message: "an example error"
end
try do
  raise ExampleError
rescue
  e in ExampleError -> e.message
end
# "an example error"

得益于 BEAM (Erlang VM),Elixir 对并发的支持很棒,并发模型是 Actors,通过消息传递交互的进程,BEAM 的进程是轻量级的,可以运行在所有 CPU 之上,类似于现在所说的协程。

创建新进程的方式很简单,和 Golang 中的 go 有异曲同工之妙,使用 spawn 即可完成,并且返回一个 pid (进程标识符)

elixir

defmodule Example do
  def add(a, b), do: a + b
end
Example.add(2, 3) # 5
spawn(Example, :add, [2, 3]) # #PID<0.124.0>

BEAM 中的进程间方式仅有消息传递, send 允许我们向 PID 发送消息,而 receive 监听和匹配消息,如果没有匹配的消息,进程会被阻塞。如果你用过 Golang,那你一定熟悉它,因为这和 Golang 中的 chan 很像,但是消息传递中发送方不会被阻塞

elixir

defmodule Example do
  def listen do
    receive do
      {:ok, "hello"} -> IO.puts("world")
    end
    IO.puts("receive end")
    listen()
  end
end
pid = spawn(Example, :listen, [])
send(pid, {:ok, "hello"})
# world
# receive end
send(pid, :ok)
# :ok

如果进程崩溃了,spawn 就会有问题,因为父进程不会知道子进程出错而导致程序异常,为了解决这个问题,我们需要将父子进程连接起来,这样它们可以收到相互退出的通知

elixir

defmodule Example do
  def explode, do: exit :iris
end
spawn(Example, :explode, []) # #PID<0.150.0>
spawn_link(Example, :explode, []) # (EXIT from #PID<0.107.0>) shell process exited with reason: :iris

有时候我们不希望链接的进程导致当前进程跟着崩溃,这时候就要通过 Process.flag/2 函数捕捉进程的错误退出,这个函数用 Erlang 的 process_flag/2 的 trap_exit 信号。当捕获到被链接的进程发生错误退出时 (trap_exit 设为 true), 就会收到像 {:EXIT, from_pid, reason} 这样的三元组形式的退出信号

elixir

defmodule Example do
  def explode, do: exit :iris
  def run do
    Process.flag(:trap_exit, true)
    spawn_link(Example, :explode, [])
    receive do
      {:EXIT, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
Example.run() # Exit reason: iris

如果不希望链接两个进程,但是仍然希望获得错误信息通知,那么就需要监控这个进程,即 spawn_monitor,不需要捕获进程,也不会导致当前进程崩溃

elixir

defmodule Example do
  def explode, do: exit :iris
  def run do
    spawn_monitor(Example, :explode, [])
    receive do
      {:DOWN, _ref, :process, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
Example.run() # Exit reason: iris

Agent 是 同步的 Promise/Future 的抽象,函数的返回值就是 Agent 的状态,我们可以通过 PID 来获取它,当然也可以通过命名来获取

elixir

{:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end) # {:ok, #PID<0.109.0>}
Agent.update(agent, fn (state) -> state ++ [4, 5] end)
Agent.get(agent, &(&1)) # [1, 2, 3, 4, 5]
{:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end, name: Numbers)
Agent.get(Numbers, &(&1)) # [1, 2, 3]

Task 是 异步的 Promise/Future 抽象,与 Agent 类似,但是提供了更多的异步操作

elixir

defmodule Example do
  def add(a, b) do
    :timer.sleep(3000) # sleep 3 s
    a + b
  end
end
task = Task.async(Example, :add, [5, 6]) # 异步执行 Example.add
:timer.sleep(2000)
Task.await(task) # 获取 Example.add 的运行结果

OTP server 包含了 GenServer 的主要行为和一系列 callbacks,GenServer 是一个专门监控和控制进程状态、启停等的抽象,属于 BEAM/OTP 的一部分,它是一个循环,每次迭代都会处理一个带有目标状态的请求

我们先学习最简单的 GenServer 的使用,即启动与初始化,我们使用 GenServer 简单的实现一个队列

elixir

defmodule SimpleQueue do
  use GenServer
  @doc """
  Start our queue and link it. This is a helper function
  """
  def start_link(state \\ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  @doc """
  GenServer.init/1 callback
  """
  def init(state), do: {:ok, state}
end

我们如果需要使用 GenServer 同步调用,则需要实现 GenServer.handle_call/3 函数,接受请求、调用者PID以及初始状态,期望的返回值为 {:reply, response, state}

elixir

@doc """
GenServer.handle_call/3 callback
"""
def handle_call(:dequeue, _from, [value | state]), do: {:reply, value, state}
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}

def queue(), do: GenServer.call(__MODULE__, :queue)
def dequeue(), do: GenServer.call(__MODULE__, :dequeue)

同步操作时,当调用 :dequeue 函数,就会从队列中取出头部,并将尾部保存为状态等待下一次使用,最终返回头部,当队列为空时则什么都不做。:queue 函数则只会展示当前状态,不会改变状态

elixir

SimpleQueue.start_link([1, 2, 3]) # {:ok, #PID<0.136.0>}
SimpleQueue.dequeue() # 1
SimpleQueue.queue() # [2, 3]
SimpleQueue.dequeue() # 2
SimpleQueue.dequeue() # 3
SimpleQueue.queue() # []

如果想实现异步操作,那么需要实现类似的 handle_cast/2 函数,它不接受调用者为参数,且没有返回值,剩下的与同步调用时几乎一致

elixir

@doc """
GenServer.handle_cast/2 callback
"""
def handle_cast({:enqueue, value}, state), do: {:noreply, state ++ [value]}

def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})

Supervisors 是一种特殊的进程,它专门监控其他进程,并自动重启出错的子进程,从而实现容错性高的程序。Supervisors 的魔力主要在 Supervisor.start_link/2 函数,这个函数除了能启动 supervisor 和子进程之外,它还允许我们设置管理子进程的策略

使用 mix new simple_queue --sup 命令,我们创建了拥有 supervisor 树的新项目, SimpleQueue 的代码放在 lib/simple_queue.ex,supervisor 的代码我们将添加到 lib/simple_queue/application.ex 中

elixir

defmodule SimpleQueue.Application do
  use Application
  def start(_type, _args) do
    children = [SimpleQueue]
    # 如果有配置项,可以使用元组来配置 children
    # children = [{SimpleQueue, [1, 2, 3]}]
    opts = [strategy: :one_for_one, name: SimpleQueue.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

如果我们的 SimpleQueue 进程崩溃了,或者被终止了,Supervisor 会自动重启这个进程,重启策略有三种

  • :one_for_one:只重启失败的子进程
  • :one_for_all:当错误事件出现时,重启所有子进程
  • :rest_for_one:重启失败的子进程,以及所有在它后面启动的进程

当 Supervisor 进程启动后,它必须知道如何操作子进程,所以每个子模块都应该有 child_spec/1 函数来定义操作行为,不过幸运的是,如果使用了 use GenServer / use Supervisoruse Agent 会自动帮我们定义好这些行为,如果需要自己定义的话

elixir

def child_spec(opts) do
  %{
    id: SimpleQueue,
    start: {__MODULE__, :start_link, [opts]},
    shutdown: 5_000,
    restart: :permanent,
    type: :worker,
  }
end

我们下来来说说这些参数都是什么

  • id:Supervisor 用于定位子进程的 specification,必选
  • start:被 Supervisor 启动时,需要调用的 Module / Function / Arguments,必选
  • shutdown:子进程关闭时的行为,可选
    • :brutal_kill 子进程立即停止
    • :infinity Supervisor 将会无限期等待,这是 :supervisor 进程类型的默认值
    • 任意正整数,以 ms 为单位的等待时间,超时后将杀掉子进程, :work 进程类型的默认值为 5000
  • restart:子进程崩溃时的处理方式,可选
    • :permanent 总是重启子进程,这是默认值
    • :temporary 绝不重启子进程
    • :transient 只有在非正常中止的时候才重启子进程
  • type:进程的类型 :worker:supervisor ,默认 :worker,可选

Supervisor 通常在应用启动时伴随子进程启动,但有时候被监管的子进程在应用启动时还是 未知的 (如 web 应用中启动了一个新进程处理用户连接),我们需要一个能按需启动子进程的 Supervisor,这正是 DynamicSupervisor 的使用场景

我们不指定子进程,我们只要定义好运行时的选项即可,不过 DynamicSupervisor 只支持 :one_for_one 这一种监管策略

elixir

options = [strategy: :one_for_one, name: SimpleQueue.Supervisor]
DynamicSupervisor.start_link(options)

我们需要使用 start_child/2 函数来动态启动新的 SimpleQueue 子进程,这个函数接收一个 supervisor 和子进程 specification 作为参数 (SimpleQueue 使用了 use GenServer,所以子进程的 specification 已经定义好了)

elixir

{:ok, pid} = DynamicSupervisor.start_child(SimpleQueue.Supervisor, SimpleQueue)

Task 有自己特殊的 Supervisor,它是专门为动态创建的任务而设计的 supervisor,内部实际使用的是 DynamicSupervisor

Task.Supervisor 与其他 Supervisor 在使用上没有什么区别,与 Supervisor 主要的区别时默认重启策略的不同,Task.Supervisor 默认重启策略为 :temporary

elixir

children = [
  {Task.Supervisor, name: ExampleApp.TaskSupervisor, restart: :transient}
]
{:ok, pid} = Supervisor.start_link(children, [strategy: :one_for_one])

当创建好 Task.Supervisor 后,我们可以使用 start_child/2 来创建受监管的 task。如果我们的任务过早地崩溃掉,它会被自动启动。这个功能在处理大量涌来的请求或者后台工作的时候非常有用

elixir

{:ok, pid} = Task.Supervisor.start_child(ExampleApp.TaskSupervisor, fn -> background_work() end)