|
|
require 'thread'
module Pic
class Receiver
def initialize(*args, &block) #:nodoc:
@waiters = []
if args.empty?
if block
yield self
else
@waiters << [[], nil]
end
else
@waiters << [args, block]
end
end
def match(*args, &block)
@waiters << [args, block]
nil
end
alias :when :match
def process(message) #:nodoc:
if matched = @waiters.find { |against, block| match?(message, against) }
block = matched.last
[block ? block.call(*message) : message]
end
end
private
def match?(message, against)
against.each_with_index do |value, i|
return false unless value === message[i]
end
end
end
class Proxy
def initialize(process)
@process = process
end
def send(*args)
puts "Sending to #{self}: #{args.inspect[0..200]}" if $DEBUG
@process.send :__queue, args
end
def to_s
@process.to_s
end
def ==(other)
return false unless other
other.instance_variable_get(:@process) == @process
end
end
class Process < Thread
GROUP = ThreadGroup.new
class << self
def list()
@processes.values
end
def start(*args, &callable)
unless callable
callable = args.shift
raise ArgumentError, "Must provide block or callable first argument" unless callable.respond_to?(:call)
end
super do |*args|
process = Thread.current
GROUP.add process
Thread.exclusive do
pid = @last_pid += 1
process.instance_variable_set(:@pid, pid)
@processes[pid] = process
end
begin
process.instance_variable_set(:@queue, Queue.new)
callable[*args]
while tail = process.instance_variable_get(:@tail)
process.instance_variable_set(:@tail, nil)
tail.first[*tail.last]
end
rescue => ex
if $DEBUG
$stderr.puts "Error in process #{self}:"
$stderr.puts "#{self}: #{ex.message}"
$stderr.puts "#{self}: #{ex.backtrace[0..20]}"
end
ensure
Thread.exclusive { @processes.delete(process.pid) }
end
end
end
end
@processes = {}
@last_pid = 0
def pid
@pid
end
def receive(*args, &block)
raise RuntimeError, "Can only call on current thread" unless Thread.current == self
receiver = Pic::Receiver.new(*args, &block)
puts "#{self}: in receive" if $DEBUG
skipped = nil
queue = @queue
begin
loop do
message = queue.deq
result = receiver.process(message)
return result.first if result
skipped ||= []
skipped << message
end
ensure
skipped.each { |msg| queue.enq msg } if skipped
end
end
def tail(*args, &callable)
raise RuntimeError, "Can only call on current thread" unless Thread.current == self
unless callable
callable = args.shift
raise ArgumentError, "Must provide block or callable first argument" unless callable.respond_to?(:call)
end
@tail = [callable, args]
end
def proxy
Pic::Proxy.new(self)
end
private
def __queue(message)
@queue.enq message
end
END {
loop do
thread = Thread.exclusive { @processes.find { true } }
break unless thread
thread.last.join
end
}
end
end
module Kernel
private
def spawn(*args, &block)
Pic::Proxy.new(Pic::Process.start(*args, &block))
end
def receive(*args, &block)
Thread.current.receive(*args, &block)
end
def current()
Thread.current.proxy
end
def tail(*args, &block)
Thread.current.tail(*args, &block)
end
end
|