Report abuse


			
require 'thread'

module Pic

  class Receiver
    def initialize(*args, &block) #:nodoc:
      @waiters = []
      if args.empty?
        if block
          yield self
        else
          @waiters << [[], nil]
        end
      else
        @waiters << [args, block]
      end
    end

    def match(*args, &block)
      @waiters << [args, block]
      nil
    end
    alias :when :match

    def process(message) #:nodoc:
      if matched = @waiters.find { |against, block| match?(message, against) }
        block = matched.last
        [block ? block.call(*message) : message]
      end
    end

  private

    def match?(message, against)
      against.each_with_index do |value, i|
        return false unless value === message[i]
      end 
    end
  end


  class Proxy
    def initialize(process)
      @process = process
    end

    def send(*args)
      puts "Sending to #{self}: #{args.inspect[0..200]}" if $DEBUG
      @process.send :__queue, args
    end

    def to_s
      @process.to_s
    end

    def ==(other)
      return false unless other
      other.instance_variable_get(:@process) == @process
    end

  end


  class Process < Thread

    GROUP = ThreadGroup.new

    class << self

      def list()
        @processes.values
      end

      def start(*args, &callable)
        unless callable
          callable = args.shift
          raise ArgumentError, "Must provide block or callable first argument" unless callable.respond_to?(:call)
        end

        super do |*args|
          process = Thread.current
          GROUP.add process
          Thread.exclusive do
            pid = @last_pid += 1
            process.instance_variable_set(:@pid, pid)
            @processes[pid] = process
          end
          begin
            process.instance_variable_set(:@queue, Queue.new)
            callable[*args]
            while tail = process.instance_variable_get(:@tail)
              process.instance_variable_set(:@tail, nil)
              tail.first[*tail.last]
            end
          rescue => ex
            if $DEBUG
              $stderr.puts "Error in process #{self}:"
              $stderr.puts "#{self}: #{ex.message}"
              $stderr.puts "#{self}: #{ex.backtrace[0..20]}"
            end
          ensure
            Thread.exclusive { @processes.delete(process.pid) }
          end
        end
      end

    end

    @processes = {}
    @last_pid = 0

    def pid
      @pid
    end

    def receive(*args, &block)
      raise RuntimeError, "Can only call on current thread" unless Thread.current == self
      receiver = Pic::Receiver.new(*args, &block)
      puts "#{self}: in receive" if $DEBUG
      skipped = nil
      queue = @queue
      begin
        loop do
          message = queue.deq
          result = receiver.process(message)
          return result.first if result
          skipped ||= []
          skipped << message
        end
      ensure
        skipped.each { |msg| queue.enq msg } if skipped
      end
    end

    def tail(*args, &callable)
      raise RuntimeError, "Can only call on current thread" unless Thread.current == self
      unless callable
        callable = args.shift
        raise ArgumentError, "Must provide block or callable first argument" unless callable.respond_to?(:call)
      end
      @tail = [callable, args]
    end

    def proxy
      Pic::Proxy.new(self)
    end

  private

    def __queue(message)
      @queue.enq message
    end

    END {
      loop do
        thread = Thread.exclusive { @processes.find { true } }
        break unless thread
        thread.last.join
      end
    }

  end

end


module Kernel

private

  def spawn(*args, &block)
    Pic::Proxy.new(Pic::Process.start(*args, &block))
  end

  def receive(*args, &block)
    Thread.current.receive(*args, &block)
  end

  def current()
    Thread.current.proxy
  end

  def tail(*args, &block)
    Thread.current.tail(*args, &block)
  end
end