Ruby Concurrency
매번 어플리케이션을 만들 때면 동시성에 대한 고민을 하지 않을 수가 없습니다. 기능적인 구현에 직접 필요한 경우도 있고 퍼포먼스를 결정 지을 수 있는 요소이기 때문에 중요하다고 생각합니다.
개인적으로 대학생 때 여러가지 패턴을 배우고 프로젝트에 적용했었지만 현업 이후 부턴 보안 엔지니어링이 메인 Job이다 보니 실질적으론 Thread-pool 패턴의 사용 빈도가 압도적으로 높았던 것 같습니다.
제가 Golang은 예전에 goroutine과 sync 관련 글을 쓴 적이 있었지만 Ruby에선 따로 쓴적이 없었던 것 같네요.
오늘은 Ruby에서의 동시성 처리에 대한 이야기를 하려고 합니다.
Why Gem?
오늘 작성하는 내용은 Ruby의 Concurrent-ruby 라는 Gem을 이용한 방법들입니다. 물론 Ruby 언어에서 제공하는 기능들로도 충분히 구현해서 사용할 수 있겠지만, 이는 안정성이나 보안적인 측면에서 잘 만들어진 Gem 라이브러리보다 좋을 수 없기 때문에 가급적인 라이브러리로 처리하는 것이 좋습니다.
Concurrent-ruby
Concurrent-ruby는 Ruby에서 동시성 처리를 위한 라이브러리입니다. 보통 많이 사용하는 Promise, Async 등의 기능과 여러가지 Thread pool(worker pool), Scheduler 같은 Concurrency pattern을 가지고 있습니다.
Modern concurrency tools including agents, futures, promises, thread pools, supervisors, and more. Inspired by Erlang, Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.
Features
지원하는 기능이 많지만 추려서 정리하면 아래와 같습니다. 적절히 잘 활용한다면 직접 구현하는 것 보다 시간도 아끼고 안정성도 챙길 수 있습니다.
- Default
- Async
- ScheduledTask
- TimerTask
- Promises
- Thread-safe objects, struct, conllection, immutable object and struct
- Thread Synchronization
- 등 다수
- Edge
- Actor
- Channel
- LazyRegister
- 등 다수
Versioning
버전은 크게 3가지로 나뉩니다. 안정화 버전인 concurrent-ruby와 추가적인 기능이 존재하는 edge로 나뉩니다.
concurrent-ruby
uses Semantic Versioningconcurrent-ruby-ext
has always same version asconcurrent-ruby
concurrent-ruby-edge
will always be 0.y.z therefore follo
Installation and Usage
In Shell
sudo gem install concurrent-ruby concurrent-ruby-edge
In Gemfile
gem 'concurrent-ruby'
In Code
require 'concurrent-ruby'
Case study
Thread Pool
Worker-pool로도 불리고 제가 가장 자주 사용하는 동시성 패턴입니다. 작업 큐를 중간에 두고 Provider가 작업을 넘겨주고, Consumer가 작업을 꺼내서 처리하는 형태입니다.
Golang에선 goroutine과 channel을 이용해 쉽게 구현할 수 있었고 Ruby에서도 유사하게 Channel을 사용하여 구현할 수 있습니다.
require 'concurrent-edge'
Channel = Concurrent::Channel
# worker 작업 정의
def worker(id, jobs, results)
jobs.each do |j|
print "worker #{id} processing job #{j}\n"
sleep(1)
results << j * 2
end
end
# job 정보 전달할 채널 생성
jobs = Channel.new(buffer: :buffered, capacity: 100)
# 결과 받을 채널 생성
results = Channel.new(buffer: :buffered, capacity: 100)
# 3개의 워커 구성
(1..3).each do |w|
Channel.go { worker(w, jobs, results) }
end
# jobs로 작업 리스트를 채워줌
(1..20).each do |j|
jobs << j
end
jobs.close
# 결과 출력
(1..20).each do
~results
end
Schedule Task
require 'concurrent'
# task 생성
# 인자값으로 2가 들어갔기 때문에 명시된 작업은 2초 뒤에 실행 됨
task = Concurrent::ScheduledTask.new(2){ puts 'this is task!' }
# 현재 task 상태는 unscheduled
puts "task.state => #{task.state}"
# execute 이후 pending으로 변화
task.execute
puts "task.state => #{task.state}"
# 작업 실행되도록 3초를 기다려 줌
# 중간에 작업이 싫애되면 'this is task!' 가 출력 됨
sleep(3)
# 이후 state를 확인하면 fulfilled로 변경 됨
puts "task.state => #{task.state}"
Timer Task
require 'concurrent'
# 5초마다 현재 시간(Time.now)를 반환하는 TimerTask를 구성
task = Concurrent::TimerTask.new(
dup_on_deref: true,
execution_interval: 5
){ Time.now }
# 시작
task.execute
# 비교를 위해 현재 시간 출력
puts Time.now
# 10초 기다리고 task의 결과 값을 출력
# interval로 인해 5초 뒤에 시간이 기록 됨
sleep(10)
puts task.value
Promise
Promises를 통해 동시 처리가 가능합니다.
require 'concurrent'
# runner는 sleep(1) 이후에 인자 값을 출력하고 값을 리턴함
runner = ->(target) { sleep(1); puts target; target }
# 대상은 3개를 넣어주고
targets = ["a","b","c"]
# 3개에 대해 비동기 처리로 바로 시작
jobs = targets.map do |target|
Concurrent::Promises.future { runner.call(target) }.
then {|data| "success: #{data}"}.
rescue {|err| puts err}
# then으로 완료 후 작업을 지정할 수 있음. 리턴된 값에 success 문자를 붙여서 다시 리턴
# 에러는 rescue로 받아서 처리할 수 있음
end
p Concurrent::Promises.zip(*jobs).value