/*
 * Decompiled with CFR 0.152.
 */
package org.jruby.ext.thread;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyMarshal;
import org.jruby.RubyModule;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.api.Access;
import org.jruby.api.Convert;
import org.jruby.api.Error;
import org.jruby.ast.util.ArgsUtil;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.marshal.DataType;
import org.jruby.util.TypeConverter;

@JRubyClass(name={"Queue"})
public class Queue
extends RubyObject
implements DataType {
    protected volatile boolean closed = false;
    protected volatile int capacity;
    protected final AtomicInteger count = new AtomicInteger();
    transient Node head;
    protected transient Node last;
    protected final ReentrantLock takeLock = new ReentrantLock();
    protected final Condition notEmpty = this.takeLock.newCondition();
    protected final ReentrantLock putLock = new ReentrantLock();
    protected final Condition notFull = this.putLock.newCondition();
    private static final RubyThread.Task<Queue, IRubyObject> BLOCKING_TAKE_TASK = new RubyThread.Task<Queue, IRubyObject>(){

        @Override
        public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
            while (true) {
                try {
                    return queue.takeInternal(context);
                }
                catch (InterruptedException ie) {
                    context.blockingThreadPoll();
                    continue;
                }
                break;
            }
        }

        @Override
        public void wakeup(RubyThread thread2, Queue queue) {
            thread2.getNativeThread().interrupt();
        }
    };

    protected void signalNotEmpty() {
        ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            this.notEmpty.signal();
        }
        finally {
            takeLock.unlock();
        }
    }

    protected void signalNotFull() {
        ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            this.notFull.signal();
        }
        finally {
            putLock.unlock();
        }
    }

    protected void enqueue(Node node) {
        this.last = this.last.next = node;
    }

    protected IRubyObject dequeue() {
        Node h = this.head;
        Node first2 = h.next;
        h.next = h;
        this.head = first2;
        IRubyObject x = first2.item;
        first2.item = null;
        return x;
    }

    protected void fullyLock() {
        this.putLock.lock();
        this.takeLock.lock();
    }

    protected void fullyUnlock() {
        this.takeLock.unlock();
        this.putLock.unlock();
    }

    @Deprecated(since="10.0")
    protected void initializedCheck() {
        this.initializedCheck(this.getCurrentContext());
    }

    protected void initializedCheck(ThreadContext context) {
        if (this.capacity == 0) {
            throw Error.typeError(context, String.valueOf(this) + " not initialized");
        }
    }

    public Queue(Ruby runtime2, RubyClass type2) {
        super(runtime2, type2);
        this.last = this.head = new Node(null);
    }

    public static RubyClass setup(ThreadContext context, RubyClass Thread2, RubyClass Object2) {
        return (RubyClass)Object2.setConstant(context, "Queue", (IRubyObject)((RubyModule)((RubyModule)((RubyClass)Thread2.defineClassUnder(context, "Queue", Object2, Queue::new)).reifiedClass(Queue.class)).defineMethods(context, Queue.class)).undefMethods(context, "initialize_copy"));
    }

    public static RubyClass setupError(ThreadContext context, RubyClass Queue2, RubyClass StopIteration2, RubyClass Object2) {
        return (RubyClass)Object2.setConstant(context, "ClosedQueueError", (IRubyObject)Queue2.defineClassUnder(context, "ClosedQueueError", StopIteration2, StopIteration2.getAllocator()));
    }

    @Override
    @JRubyMethod(visibility=Visibility.PRIVATE)
    public IRubyObject initialize(ThreadContext context) {
        this.capacity = Integer.MAX_VALUE;
        return this;
    }

    @JRubyMethod(visibility=Visibility.PRIVATE)
    public IRubyObject initialize(ThreadContext context, IRubyObject items) {
        this.capacity = Integer.MAX_VALUE;
        IRubyObject tmp = TypeConverter.convertToTypeWithCheck(context, items, Access.arrayClass(context), context.sites.TypeConverter.to_a_checked);
        if (tmp.isNil()) {
            throw Error.typeError(context, "can't convert ", items, " into Array");
        }
        RubyArray array2 = (RubyArray)tmp;
        for (int i2 = 0; i2 < array2.getLength(); ++i2) {
            this.push(context, (IRubyObject)array2.eltOk(i2));
        }
        return this;
    }

    @JRubyMethod
    public IRubyObject clear(ThreadContext context) {
        this.initializedCheck(context);
        try {
            this.clearInternal();
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "clear");
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearInternal() throws InterruptedException {
        ReentrantLock putLock = this.putLock;
        ReentrantLock takeLock = this.takeLock;
        putLock.lockInterruptibly();
        try {
            takeLock.lockInterruptibly();
            try {
                Node p2;
                Node h = this.head;
                while ((p2 = h.next) != null) {
                    h.next = h;
                    p2.item = null;
                    h = p2;
                }
                this.head = this.last;
                if (this.count.getAndSet(0) == this.capacity) {
                    this.notFull.signal();
                }
            }
            finally {
                takeLock.unlock();
            }
        }
        finally {
            putLock.unlock();
        }
    }

    @Override
    @JRubyMethod(name={"freeze"})
    public final IRubyObject freeze(ThreadContext context) {
        throw Error.typeError(context, "cannot freeze " + String.valueOf(this));
    }

    @JRubyMethod(name={"empty?"})
    public RubyBoolean empty_p(ThreadContext context) {
        this.initializedCheck(context);
        return Convert.asBoolean(context, this.count.get() == 0);
    }

    @JRubyMethod(name={"length", "size"})
    public RubyNumeric length(ThreadContext context) {
        this.initializedCheck(context);
        return Convert.asFixnum(context, this.count.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @JRubyMethod
    public RubyNumeric num_waiting(ThreadContext context) {
        this.initializedCheck(context);
        ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            RubyFixnum rubyFixnum = Convert.asFixnum(context, takeLock.getWaitQueueLength(this.notEmpty));
            takeLock.unlock();
            return rubyFixnum;
        }
        catch (Throwable throwable) {
            try {
                takeLock.unlock();
                throw throwable;
            }
            catch (InterruptedException ie) {
                throw this.createInterruptedError(context, "num_waiting");
            }
        }
    }

    @JRubyMethod(name={"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext context) {
        this.initializedCheck(context);
        try {
            return context.getThread().executeTaskBlocking(context, this, BLOCKING_TAKE_TASK);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "pop");
        }
    }

    @JRubyMethod(name={"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext context, IRubyObject nonblockOrOpts) {
        this.initializedCheck(context);
        boolean nonblock2 = false;
        long timeoutNS = 0L;
        RubyHash opts = ArgsUtil.extractKeywords(nonblockOrOpts);
        if (opts != null) {
            IRubyObject _timeout = ArgsUtil.extractKeywordArg(context, "timeout", opts);
            if (!_timeout.isNil() && (timeoutNS = Queue.queueTimeoutToNanos(context, _timeout)) == 0L && this.count.get() == 0) {
                return context.nil;
            }
        } else {
            nonblock2 = nonblockOrOpts.isTrue();
        }
        return this.popCommon(context, nonblock2, timeoutNS);
    }

    @JRubyMethod(name={"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext context, IRubyObject _nonblock, IRubyObject _opts) {
        this.initializedCheck(context);
        boolean nonblock2 = _nonblock.isTrue();
        long timeoutNS = 0L;
        IRubyObject _timeout = ArgsUtil.extractKeywordArg(context, "timeout", _opts);
        if (!_timeout.isNil()) {
            if (nonblock2) {
                throw Error.argumentError(context, "can't set a timeout if non_block is enabled");
            }
            timeoutNS = Queue.queueTimeoutToNanos(context, _timeout);
            if (timeoutNS == 0L && this.count.get() == 0) {
                return context.nil;
            }
        }
        return this.popCommon(context, nonblock2, timeoutNS);
    }

    private IRubyObject popCommon(ThreadContext context, boolean nonblock2, long timeoutNS) {
        try {
            if (nonblock2) {
                IRubyObject result2 = this.pollInternal();
                if (result2 == null) {
                    throw context.runtime.newThreadError("queue empty");
                }
                return result2;
            }
            BlockingPollTask task = timeoutNS != 0L ? new BlockingPollTask(timeoutNS) : BLOCKING_TAKE_TASK;
            return context.getThread().executeTaskBlocking(context, this, task);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "pop");
        }
    }

    protected static long queueTimeoutToNanos(ThreadContext context, IRubyObject _timeout) {
        long l;
        if (_timeout.isNil()) {
            return 0L;
        }
        if (_timeout instanceof RubyFixnum) {
            RubyFixnum fixnum = (RubyFixnum)_timeout;
            l = TimeUnit.NANOSECONDS.convert(fixnum.getValue(), TimeUnit.SECONDS);
        } else {
            l = (long)(Convert.toDouble(context, _timeout) * 1.0E9);
        }
        return l;
    }

    @JRubyMethod(name={"push", "<<", "enq"})
    public IRubyObject push(ThreadContext context, IRubyObject value2) {
        this.initializedCheck(context);
        try {
            this.putInternal(context, value2);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "push");
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void putInternal(ThreadContext context, IRubyObject e) throws InterruptedException {
        int c;
        if (e == null) {
            throw new NullPointerException();
        }
        Node node = new Node(e);
        ReentrantLock putLock = this.putLock;
        AtomicInteger count2 = this.count;
        putLock.lockInterruptibly();
        try {
            boolean isClosed;
            while (!(isClosed = this.closed) && count2.get() >= this.capacity) {
                this.notFull.await();
            }
            if (isClosed) {
                this.notFull.signal();
                this.raiseClosedError(context);
            }
            this.enqueue(node);
            c = count2.getAndIncrement();
            if (c + 1 < this.capacity) {
                this.notFull.signal();
            }
        }
        finally {
            putLock.unlock();
        }
        if (c == 0) {
            this.signalNotEmpty();
        }
    }

    @JRubyMethod
    public IRubyObject marshal_dump(ThreadContext context) {
        return RubyMarshal.undumpable(context, this);
    }

    @JRubyMethod
    public IRubyObject close(ThreadContext context) {
        this.initializedCheck(context);
        try {
            this.closeInternal();
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "close");
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeInternal() throws InterruptedException {
        ReentrantLock putLock = this.putLock;
        ReentrantLock takeLock = this.takeLock;
        AtomicInteger count2 = this.count;
        putLock.lockInterruptibly();
        try {
            takeLock.lockInterruptibly();
            try {
                if (this.closed) {
                    return;
                }
                this.closed = true;
                int c = count2.get();
                if (c >= this.capacity) {
                    this.notFull.signal();
                } else if (c == 0) {
                    this.notEmpty.signal();
                }
            }
            finally {
                takeLock.unlock();
            }
        }
        finally {
            putLock.unlock();
        }
    }

    @JRubyMethod(name={"closed?"})
    public IRubyObject closed_p(ThreadContext context) {
        this.initializedCheck(context);
        return Convert.asBoolean(context, this.closed);
    }

    public synchronized void shutdown() throws InterruptedException {
        this.closeInternal();
    }

    public boolean isShutdown() {
        return this.closed;
    }

    @Deprecated(since="10.0")
    public synchronized void checkShutdown() {
        if (this.isShutdown()) {
            Ruby runtime2 = this.getCurrentContext().runtime;
            throw RaiseException.from(runtime2, runtime2.getThreadError(), "queue shut down");
        }
    }

    protected long java_length() {
        return this.count.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected IRubyObject takeInternal(ThreadContext context) throws InterruptedException {
        IRubyObject x;
        int c = -1;
        AtomicInteger count2 = this.count;
        ReentrantLock takeLock = this.takeLock;
        boolean notFullSignalNeeded = false;
        takeLock.lockInterruptibly();
        try {
            boolean canDequeue;
            boolean isClosed;
            while (!(isClosed = this.closed) && count2.get() == 0) {
                this.notEmpty.await();
            }
            boolean bl = canDequeue = !isClosed || count2.get() != 0;
            if (canDequeue) {
                x = this.dequeue();
                c = count2.getAndDecrement();
            } else {
                x = context.nil;
            }
            if (c > 1 || isClosed) {
                this.notEmpty.signal();
            }
            if (canDequeue) {
                notFullSignalNeeded = c == this.capacity;
            }
        }
        finally {
            takeLock.unlock();
        }
        if (notFullSignalNeeded) {
            this.signalNotFull();
        }
        return x;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IRubyObject pollInternal() throws InterruptedException {
        int c;
        IRubyObject x;
        AtomicInteger count2 = this.count;
        if (count2.get() == 0) {
            return null;
        }
        ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count2.get() == 0) {
                IRubyObject iRubyObject = null;
                return iRubyObject;
            }
            x = this.dequeue();
            c = count2.getAndDecrement();
            if (c > 1) {
                this.notEmpty.signal();
            }
        }
        finally {
            takeLock.unlock();
        }
        if (c == this.capacity) {
            this.signalNotFull();
        }
        return x;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IRubyObject pollInternal(long timeout2, TimeUnit unit) throws InterruptedException {
        int c;
        IRubyObject x;
        long nanos = unit.toNanos(timeout2);
        AtomicInteger count2 = this.count;
        ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count2.get() == 0) {
                if (nanos <= 0L) {
                    IRubyObject iRubyObject = null;
                    return iRubyObject;
                }
                nanos = this.notEmpty.awaitNanos(nanos);
            }
            x = this.dequeue();
            c = count2.getAndDecrement();
            if (c > 1) {
                this.notEmpty.signal();
            }
        }
        finally {
            takeLock.unlock();
        }
        if (c == this.capacity) {
            this.signalNotFull();
        }
        return x;
    }

    public IRubyObject raiseClosedError(ThreadContext context) {
        throw context.runtime.newRaiseException(context.runtime.getClosedQueueError(), "queue closed");
    }

    protected RaiseException createInterruptedError(ThreadContext context, String methodName) {
        return context.runtime.newThreadError("interrupted in " + this.getMetaClass().getName(context) + "#" + methodName);
    }

    static class Node {
        IRubyObject item;
        Node next;

        Node(IRubyObject x) {
            this.item = x;
        }
    }

    private static class BlockingPollTask
    implements RubyThread.Task<Queue, IRubyObject> {
        private final long timeoutNS;

        public BlockingPollTask(long timeoutNS) {
            this.timeoutNS = timeoutNS;
        }

        @Override
        public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
            IRubyObject result2;
            IRubyObject iRubyObject = result2 = this.timeoutNS == 0L ? queue.pollInternal() : queue.pollInternal(this.timeoutNS, TimeUnit.NANOSECONDS);
            if (result2 == null && this.timeoutNS == 0L) {
                throw context.runtime.newThreadError("queue empty");
            }
            return Helpers.nullToNil(result2, context.nil);
        }

        @Override
        public void wakeup(RubyThread thread2, Queue queue) {
            thread2.getNativeThread().interrupt();
        }
    }
}

