/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.python.utils;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.util.Preconditions;
import pemja.core.PythonInterpreter;

public class PythonActionExecutor {
    private static final String PYTHON_IMPORTS = "from flink_agents.plan import function\nfrom flink_agents.runtime import flink_runner_context\nfrom flink_agents.runtime import python_java_utils";
    private static final String CREATE_FLINK_RUNNER_CONTEXT = "flink_runner_context.create_flink_runner_context";
    private static final String FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX = "flink_runner_context_";
    private static final AtomicLong FLINK_RUNNER_CONTEXT_REF_ID = new AtomicLong(0L);
    private static final String CREATE_ASYNC_THREAD_POOL = "flink_runner_context.create_async_thread_pool";
    private static final String CLOSE_ASYNC_THREAD_POOL = "flink_runner_context.close_async_thread_pool";
    private static final String PYTHON_ASYNC_THREAD_POOL_REF_NAME = "python_async_thread_pool";
    private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_REF_ID = new AtomicLong(0L);
    private static final String CALL_PYTHON_GENERATOR = "function.call_python_generator";
    private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX = "python_generator_";
    private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new AtomicLong(0L);
    private static final String CONVERT_TO_PYTHON_OBJECT = "python_java_utils.convert_to_python_object";
    private static final String WRAP_TO_INPUT_EVENT = "python_java_utils.wrap_to_input_event";
    private static final String GET_OUTPUT_FROM_OUTPUT_EVENT = "python_java_utils.get_output_from_output_event";
    private final PythonEnvironmentManager environmentManager;
    private final String agentPlanJson;
    private PythonInterpreter interpreter;
    private String pythonAsyncThreadPoolObjectName;

    public PythonActionExecutor(PythonEnvironmentManager environmentManager, String agentPlanJson) {
        this.environmentManager = environmentManager;
        this.agentPlanJson = agentPlanJson;
    }

    public void open() throws Exception {
        this.environmentManager.open();
        EmbeddedPythonEnvironment env = this.environmentManager.createEnvironment();
        this.interpreter = env.getInterpreter();
        this.interpreter.exec(PYTHON_IMPORTS);
        Object pythonAsyncThreadPool = this.interpreter.invoke(CREATE_ASYNC_THREAD_POOL, new Object[0]);
        this.pythonAsyncThreadPoolObjectName = PYTHON_ASYNC_THREAD_POOL_REF_NAME + PYTHON_ASYNC_THREAD_POOL_REF_ID.incrementAndGet();
        this.interpreter.set(this.pythonAsyncThreadPoolObjectName, pythonAsyncThreadPool);
    }

    public String executePythonFunction(PythonFunction function, PythonEvent event, RunnerContextImpl runnerContext) throws Exception {
        runnerContext.checkNoPendingEvents();
        function.setInterpreter(this.interpreter);
        Object pythonRunnerContextObject = this.interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, new Object[]{runnerContext, this.agentPlanJson, this.interpreter.get(this.pythonAsyncThreadPoolObjectName)});
        String pythonRunnerContextObjectName = FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX + FLINK_RUNNER_CONTEXT_REF_ID.incrementAndGet();
        this.interpreter.set(pythonRunnerContextObjectName, pythonRunnerContextObject);
        Object pythonEventObject = this.interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, new Object[]{event.getEvent()});
        try {
            Object calledResult = function.call(pythonEventObject, pythonRunnerContextObject);
            if (calledResult == null) {
                return null;
            }
            String pythonGeneratorRef = PYTHON_GENERATOR_VAR_NAME_PREFIX + PYTHON_GENERATOR_VAR_ID.incrementAndGet();
            this.interpreter.set(pythonGeneratorRef, calledResult);
            return pythonGeneratorRef;
        }
        catch (Exception e) {
            runnerContext.drainEvents(null);
            throw new PythonActionExecutionException("Failed to execute Python action", e);
        }
    }

    public PythonEvent wrapToInputEvent(Object eventData) {
        Preconditions.checkState((boolean)(eventData instanceof byte[]));
        return new PythonEvent((byte[])this.interpreter.invoke(WRAP_TO_INPUT_EVENT, new Object[]{eventData}), "flink_agents.api.events.event.InputEvent");
    }

    public Object getOutputFromOutputEvent(byte[] pythonOutputEvent) {
        return this.interpreter.invoke(GET_OUTPUT_FROM_OUTPUT_EVENT, new Object[]{pythonOutputEvent});
    }

    public boolean callPythonGenerator(String pythonGeneratorRef) {
        Object pythonGenerator = this.interpreter.get(pythonGeneratorRef);
        Object invokeResult = this.interpreter.invoke(CALL_PYTHON_GENERATOR, new Object[]{pythonGenerator});
        Preconditions.checkState((invokeResult.getClass().isArray() && ((Object[])invokeResult).length == 2 ? 1 : 0) != 0);
        return (Boolean)((Object[])invokeResult)[0];
    }

    public void close() throws Exception {
        if (this.pythonAsyncThreadPoolObjectName != null) {
            this.interpreter.invoke(CLOSE_ASYNC_THREAD_POOL, new Object[]{this.interpreter.get(this.pythonAsyncThreadPoolObjectName)});
        }
    }

    public static class PythonActionExecutionException
    extends Exception {
        public PythonActionExecutionException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

