Mix 是 Elixir 社区开发的集包管理、依赖管理、构建工具于一身的开发工具,扩展性极好,功能强大,自带对 Erlang 的支持,可以类比 Golang 自带的 go,详细的使用方式请参考
mix help
以及 mix
我们如果需要创建一个新项目,使用 mix new
命令即可,详细使用方法可以使用 mix
help new 查看,对于新建项目,mix 会很友好的创建一系列文件 (其中还包含
.gitignore
)
我们目前只需要关注其中的 mix.exs 就行了,它包含了配置应用、依赖、环境信息、版本等功能,project
函数设置项目相关信息, application
函数在生产应用文件的时候会用到,deps
函数则是定义项目的依赖项
我们需要把所需的依赖全部列入 deps 中,deps 返回一个列表,每一项依赖都写在元组中,格式如下
1
2
3
|
{app, requirement}
{app, opts}
{app, requirement, opts}
|
app
是一个原子,是依赖项的名称
requirement
是一个字符串或正则表达式,用以设定版本
opts
是一个 keyword list,设置依赖相关操作
下面列出常用的添加依赖方式
1
2
3
4
5
6
|
{:plug, ">= 0.4.0"}, # 从 hex.pm 安装版本大于等于 0.4.0 的依赖
{:gettext, git: "https://github.com/elixir-lang/gettext.git", tag: "0.1"}, # 从指定git仓库下载依赖
{:local_dep, path: "/path/to/local/deps"}, # 本地依赖项
{:telemetry, "~> 0.4"}, # 从 hex.pm 安装版本 0.4 的依赖项
{:phoenix_view, github: "phoenixframework/phoenix_view", branch: "master"}, # 从 github 下载依赖 master 分支
{:cowboy, "~> 1.0", only: [:dev, :test]}, # 安装依赖,并只在 dev 与 tst 环境启用
|
当依赖项写好之后,我们只需要执行命令获取依赖就行
至于环境,mix 默认支持三种
:dev
开发环境,默认的环境
:test
测试环境,使用 mix test
:prod
生产环境,把应用上线到生产环境下的配置项
可以写一个 mix.env 文件,来让 mix 从中获取环境,也可以在命令行中使用 MIX_ENV
来配置环境
1
|
MIX_ENV=prod mix compile
|
Elixir 社区提供了相当多的工具,其中之前讲过了文档使用的 ExDoc,今天我们主要说一说测试工具 ExUnit
测试是通过 Elixir 脚本来执行的,所以测试文件的后缀必须是 .exs,在测试之前需要使用 ExUnit.start()
启动 ExUnit (一般 mix 中的 test/test_helper.exs
已经帮我们做了这一步)。当执行 mix test
时,就开始运行项目测试了,测试时除了 case 之外,还会执行文档测试
1
2
3
4
5
6
7
8
|
# test/example_test.exs
defmodule ExampleTest do
use ExUnit.Case
doctest Example
test "greets the world" do
assert Example.hello() == :world
end
end
|
- 断言 (assert) 一般用于测试中检查表达式的值是否为真,表达式为假是则会抛出异常,测试失败,下面我们让表达式为假看看测试结果
1
2
3
4
5
6
7
|
1) test greets the world (ExampleTest)
test/example_test.exs:4
Assertion with != failed, both sides are exactly equal
code: assert Example.hello() != :world
left: :world
stacktrace:
test/example_test.exs:5: (test)
|
- refute 与 assert 的关系就像 unless 与 if,所以它们正好是一对语义相反的断言
- assert_raise 是错误处理中断言某个错误是否被抛出,而 assert_receive 则是并发当中断言小时是否被发送
- capture_io 和 capture_log 都是检查应用是否正确输出,不过
_io
检查的是IO输出,_log
检查的是logger输出
Elixir 构建于 BEAM (Erlang VM) 之上,可以直接使用 Erlang 大量的库。Erlang 的模块用小写的原子变量表示,比如 :os
和 :timer
1
2
3
4
5
6
7
8
9
10
|
# 使用 Erlang 的模块计算函数运行时间
defmodule Example do
def time(fun, args) do
{time, result} = :timer.tc(fun, args)
IO.puts("Time: #{time} μs\nResult: #{result}")
end
end
Example.time(&(&1 * &1 * &1), [100])
# Time: 9 μs
# Result: 1000000
|
至于 Erlang 的第三方模块,我们使用 mix 管理,如果模块不在 hex 中,我们可以将依赖的 git 仓库添加进来,之后就可以愉快的使用这些模块了
需要注意的是,有一些小坑需要注意,比如字符串, Elixir 的字符串是 UTF-8 编码的二进制数据,而 Erlang 的是字符列表
1
2
3
4
|
is_list('Example') # true
is_list("Example") # false
is_binary("Example") # true
<<"Example">> === "Example" # true
|
1
2
3
4
|
is_list('Example'). %% false
is_list("Example"). %% true
is_binary("Example"). %% false
is_binary(<<"Example">>). %% true
|
Elixir 通常会返回 {:ok, result}
或 {:error, reason}
来表示错误,或者抛出异常,
Elixir 社区在返回错误方面有一些约定
- 对于那些作为一个函数功能相关的错误,这个函数应当相应地返回元组表示错误,如用户输入了一个错误的日期类型值
- 对于那些和函数功能无关的错误,则需要抛出异常,如无法正确解析配置的参数
通常一些公开的 API 中会有一个带感叹号的版本,这些函数返回一个未封包的结果,或者抛出异常
在学习如何处理异常之前,我们应该学习如何产生一个错误,最简单的方式就是 raise
,它接收错误消息,并产生一个错误;对于产生的错误,我们使用 try/rescue
与模式匹配来处理
1
2
3
4
5
6
7
8
|
try do
if elixir, do: raise "Oh NO!!!" # (RuntimeError) Oh NO!!!
# (ArgumentError) the argument value is invalid
raise ArgumentError, message: "the argument value is invalid"
rescue
e in RuntimeError -> "A Runtime Error occurred: #{e.message}"
e in ArgumentError -> "An Argument Error occurred: #{e.message}"
end
|
对于无论是否产生错误,都需要在 try/rescue
之后进行的操作,我们使用 after 来执行
1
2
3
4
5
6
7
8
9
10
|
try do
if elixir, do: raise "Oh NO!!!"
rescue
e in RuntimeError -> "A Runtime Error occurred: #{e.message}"
after
IO.puts("The End!!!")
end
# when elixir==true:
# The End!!!
# "A Runtime Error occurred: Oh NO!!!"
|
这种情况比较常见的用法是关闭文件、连接等(突然怀念 RAII
感觉遇到了老熟人, throw
和 try/catch
,这种错误处理可以直接抛出一个值,并从当前执行的流程中退出,catch 可以直接使用这个抛出的值,不过在 Elixir 新代码中用的很少了
1
2
3
4
5
6
7
8
|
try do
for x <- 1..10 do
if rem(x, 3) == 0, do: throw x
end
catch
x -> "Caught: #{x}"
end
# "Caught: 3"
|
Exiting 是 Elixir 提供的最后一种产生错误的方式,产生退出信号直接挂掉,这是
Elixir 容错机制的一部分
1
2
3
4
5
6
|
fn -> exit "Oops" end.() # (exit) "Oops"
try do
exit "Oops"
catch
:exit, _ -> "EXIT!!!"
end
|
虽然 exit 可以被捕获,但是请不要这样做,把它交给 supervisor 去处理
Elixir 提供了相当的内建错误类型,不过 Elixir 还是提供了自建错误类型的方法,使用
defexception/1 来创建新的错误类型,并通过 :message
来设置默认的错误消息
1
2
3
4
5
6
7
8
9
|
defmodule ExampleError do
defexception message: "an example error"
end
try do
raise ExampleError
rescue
e in ExampleError -> e.message
end
# "an example error"
|
得益于 BEAM (Erlang VM),Elixir 对并发的支持很棒,并发模型是 Actors,通过消息传递交互的进程,BEAM 的进程是轻量级的,可以运行在所有 CPU 之上,类似于现在所说的协程。
创建新进程的方式很简单,和 Golang 中的 go 有异曲同工之妙,使用 spawn
即可完成,并且返回一个 pid (进程标识符)
1
2
3
4
5
|
defmodule Example do
def add(a, b), do: a + b
end
Example.add(2, 3) # 5
spawn(Example, :add, [2, 3]) # #PID<0.124.0>
|
BEAM 中的进程间方式仅有消息传递, send
允许我们向 PID 发送消息,而 receive
监听和匹配消息,如果没有匹配的消息,进程会被阻塞。如果你用过 Golang,那你一定熟悉它,因为这和 Golang 中的 chan 很像,但是消息传递中发送方不会被阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
defmodule Example do
def listen do
receive do
{:ok, "hello"} -> IO.puts("world")
end
IO.puts("receive end")
listen()
end
end
pid = spawn(Example, :listen, [])
send(pid, {:ok, "hello"})
# world
# receive end
send(pid, :ok)
# :ok
|
如果进程崩溃了,spawn 就会有问题,因为父进程不会知道子进程出错而导致程序异常,为了解决这个问题,我们需要将父子进程连接起来,这样它们可以收到相互退出的通知
1
2
3
4
5
|
defmodule Example do
def explode, do: exit :iris
end
spawn(Example, :explode, []) # #PID<0.150.0>
spawn_link(Example, :explode, []) # (EXIT from #PID<0.107.0>) shell process exited with reason: :iris
|
有时候我们不希望链接的进程导致当前进程跟着崩溃,这时候就要通过 Process.flag/2
函数捕捉进程的错误退出,这个函数用 Erlang 的 process_flag/2 的 trap_exit 信号。当捕获到被链接的进程发生错误退出时 (trap_exit 设为 true), 就会收到像 {:EXIT,
from_pid, reason} 这样的三元组形式的退出信号
1
2
3
4
5
6
7
8
9
10
11
|
defmodule Example do
def explode, do: exit :iris
def run do
Process.flag(:trap_exit, true)
spawn_link(Example, :explode, [])
receive do
{:EXIT, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
end
end
end
Example.run() # Exit reason: iris
|
如果不希望链接两个进程,但是仍然希望获得错误信息通知,那么就需要监控这个进程,即
spawn_monitor
,不需要捕获进程,也不会导致当前进程崩溃
1
2
3
4
5
6
7
8
9
10
|
defmodule Example do
def explode, do: exit :iris
def run do
spawn_monitor(Example, :explode, [])
receive do
{:DOWN, _ref, :process, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
end
end
end
Example.run() # Exit reason: iris
|
Agent 是 同步的 Promise/Future 的抽象,函数的返回值就是 Agent 的状态,我们可以通过 PID 来获取它,当然也可以通过命名来获取
1
2
3
4
5
|
{:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end) # {:ok, #PID<0.109.0>}
Agent.update(agent, fn (state) -> state ++ [4, 5] end)
Agent.get(agent, &(&1)) # [1, 2, 3, 4, 5]
{:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end, name: Numbers)
Agent.get(Numbers, &(&1)) # [1, 2, 3]
|
Task 是 异步的 Promise/Future 抽象,与 Agent 类似,但是提供了更多的异步操作
1
2
3
4
5
6
7
8
9
|
defmodule Example do
def add(a, b) do
:timer.sleep(3000) # sleep 3 s
a + b
end
end
task = Task.async(Example, :add, [5, 6]) # 异步执行 Example.add
:timer.sleep(2000)
Task.await(task) # 获取 Example.add 的运行结果
|
OTP server 包含了 GenServer 的主要行为和一系列 callbacks,GenServer 是一个专门监控和控制进程状态、启停等的抽象,属于 BEAM/OTP 的一部分,它是一个循环,每次迭代都会处理一个带有目标状态的请求
我们先学习最简单的 GenServer 的使用,即启动与初始化,我们使用 GenServer 简单的实现一个队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
defmodule SimpleQueue do
use GenServer
@doc """
Start our queue and link it. This is a helper function
"""
def start_link(state \\ []) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
@doc """
GenServer.init/1 callback
"""
def init(state), do: {:ok, state}
end
|
我们如果需要使用 GenServer 同步调用,则需要实现 GenServer.handle_call/3
函数,接受请求、调用者PID以及初始状态,期望的返回值为 {:reply, response, state}
1
2
3
4
5
6
7
8
9
|
@doc """
GenServer.handle_call/3 callback
"""
def handle_call(:dequeue, _from, [value | state]), do: {:reply, value, state}
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}
def queue(), do: GenServer.call(__MODULE__, :queue)
def dequeue(), do: GenServer.call(__MODULE__, :dequeue)
|
同步操作时,当调用 :dequeue 函数,就会从队列中取出头部,并将尾部保存为状态等待下一次使用,最终返回头部,当队列为空时则什么都不做。:queue
函数则只会展示当前状态,不会改变状态
1
2
3
4
5
6
|
SimpleQueue.start_link([1, 2, 3]) # {:ok, #PID<0.136.0>}
SimpleQueue.dequeue() # 1
SimpleQueue.queue() # [2, 3]
SimpleQueue.dequeue() # 2
SimpleQueue.dequeue() # 3
SimpleQueue.queue() # []
|
如果想实现异步操作,那么需要实现类似的 handle_cast/2
函数,它不接受调用者为参数,且没有返回值,剩下的与同步调用时几乎一致
1
2
3
4
5
6
|
@doc """
GenServer.handle_cast/2 callback
"""
def handle_cast({:enqueue, value}, state), do: {:noreply, state ++ [value]}
def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})
|
Supervisors 是一种特殊的进程,它专门监控其他进程,并自动重启出错的子进程,从而实现容错性高的程序。Supervisors 的魔力主要在 Supervisor.start_link/2
函数,这个函数除了能启动 supervisor 和子进程之外,它还允许我们设置管理子进程的策略
使用 mix new simple_queue --sup
命令,我们创建了拥有 supervisor 树的新项目,
SimpleQueue 的代码放在 lib/simple_queue.ex,supervisor 的代码我们将添加到
lib/simple_queue/application.ex 中
1
2
3
4
5
6
7
8
9
10
|
defmodule SimpleQueue.Application do
use Application
def start(_type, _args) do
children = [SimpleQueue]
# 如果有配置项,可以使用元组来配置 children
# children = [{SimpleQueue, [1, 2, 3]}]
opts = [strategy: :one_for_one, name: SimpleQueue.Supervisor]
Supervisor.start_link(children, opts)
end
end
|
如果我们的 SimpleQueue 进程崩溃了,或者被终止了,Supervisor 会自动重启这个进程,重启策略有三种
:one_for_one
:只重启失败的子进程
:one_for_all
:当错误事件出现时,重启所有子进程
:rest_for_one
:重启失败的子进程,以及所有在它后面启动的进程
当 Supervisor 进程启动后,它必须知道如何操作子进程,所以每个子模块都应该有
child_spec/1
函数来定义操作行为,不过幸运的是,如果使用了 use GenServer
/
use Supervisor
和 use Agent
会自动帮我们定义好这些行为,如果需要自己定义的话
1
2
3
4
5
6
7
8
9
|
def child_spec(opts) do
%{
id: SimpleQueue,
start: {__MODULE__, :start_link, [opts]},
shutdown: 5_000,
restart: :permanent,
type: :worker,
}
end
|
我们下来来说说这些参数都是什么
- id:Supervisor 用于定位子进程的 specification,必选
- start:被 Supervisor 启动时,需要调用的 Module / Function / Arguments,必选
- shutdown:子进程关闭时的行为,可选
:brutal_kill
子进程立即停止
:infinity
Supervisor 将会无限期等待,这是 :supervisor 进程类型的默认值
- 任意正整数,以 ms 为单位的等待时间,超时后将杀掉子进程, :work 进程类型的默认值为 5000
- restart:子进程崩溃时的处理方式,可选
:permanent
总是重启子进程,这是默认值
:temporary
绝不重启子进程
:transient
只有在非正常中止的时候才重启子进程
- type:进程的类型
:worker
或 :supervisor
,默认 :worker,可选
Supervisor 通常在应用启动时伴随子进程启动,但有时候被监管的子进程在应用启动时还是 未知的 (如 web 应用中启动了一个新进程处理用户连接),我们需要一个能按需启动子进程的 Supervisor,这正是 DynamicSupervisor 的使用场景
我们不指定子进程,我们只要定义好运行时的选项即可,不过 DynamicSupervisor 只支持
:one_for_one
这一种监管策略
1
2
|
options = [strategy: :one_for_one, name: SimpleQueue.Supervisor]
DynamicSupervisor.start_link(options)
|
我们需要使用 start_child/2 函数来动态启动新的 SimpleQueue 子进程,这个函数接收一个 supervisor 和子进程 specification 作为参数 (SimpleQueue 使用了 use GenServer,所以子进程的 specification 已经定义好了)
1
|
{:ok, pid} = DynamicSupervisor.start_child(SimpleQueue.Supervisor, SimpleQueue)
|
Task 有自己特殊的 Supervisor,它是专门为动态创建的任务而设计的 supervisor,内部实际使用的是 DynamicSupervisor
Task.Supervisor 与其他 Supervisor 在使用上没有什么区别,与 Supervisor 主要的区别时默认重启策略的不同,Task.Supervisor 默认重启策略为 :temporary
1
2
3
4
|
children = [
{Task.Supervisor, name: ExampleApp.TaskSupervisor, restart: :transient}
]
{:ok, pid} = Supervisor.start_link(children, [strategy: :one_for_one])
|
当创建好 Task.Supervisor 后,我们可以使用 start_child/2
来创建受监管的 task。如果我们的任务过早地崩溃掉,它会被自动启动。这个功能在处理大量涌来的请求或者后台工作的时候非常有用
1
|
{:ok, pid} = Task.Supervisor.start_child(ExampleApp.TaskSupervisor, fn -> background_work() end)
|