-
Notifications
You must be signed in to change notification settings - Fork 1
/
pdsim_unity.py
178 lines (151 loc) · 6.19 KB
/
pdsim_unity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import json
import zmq
import signal
import time
import argparse
import unified_planning as up
from unified_planning.io.pddl_reader import PDDLReader
from unified_planning.model import Problem
from unified_planning.engines import PlanGenerationResult, CompilationKind
from unified_planning.shortcuts import OneshotPlanner, get_all_applicable_engines, Compiler
from unified_planning.grpc.proto_writer import ProtobufWriter
class PdSimUnityServer:
def __init__(self, problem_model: Problem, plan_result: PlanGenerationResult, host, port):
self.host = host
self.port = port
self.proto_writer = ProtobufWriter()
self.problem = problem_model
self.plan_result = plan_result
def info(self):
print("####################################")
print("##### PDSim Server #####")
print("####################################")
print(f"--- Listening on {self.host}:{self.port} ---")
print("####################################")
def convert_to_protobuf(self, model):
try:
parse = self.proto_writer.convert(model)
except Exception as exception:
print(exception)
return None
return parse.SerializeToString()
def server_loop(self):
context = zmq.Context()
socket: zmq.Socket = context.socket(zmq.REP)
socket.bind('tcp://{}:{}'.format(self.host, self.port))
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.info()
while True:
request = socket.recv_json()
if request['request'] == 'ping':
socket.send_json({'status': 'OK'})
elif request['request'] == 'problem':
converted_problem = self.convert_to_protobuf(self.problem)
if converted_problem is not None:
print(converted_problem)
socket.send(converted_problem)
elif request['request'] == 'plan':
converted_plan = self.convert_to_protobuf(self.plan_result)
if converted_plan is not None:
socket.send(converted_plan)
else:
print("Unknown request")
time.sleep(0.1)
def usage():
# host and port are optional
print('''Usage: python
pdsim_unity.py --domain <domain_file>
--problem <problem_file>
[--planner <planner_name>]
[--host <host_address>]
[--port <port>]
''')
def compile_problem(problem):
# Remove quantifiers and conditional effects
with Compiler(
problem_kind=problem.kind,
compilation_kind=CompilationKind.QUANTIFIERS_REMOVING) as compiler:
compilation_result = compiler.compile(problem)
with Compiler(
problem_kind=problem.kind,
compilation_kind=CompilationKind.CONDITIONAL_EFFECTS_REMOVING) as compiler:
compilation_result = compiler.compile(compilation_result.problem)
return compilation_result.problem
def select_planner(planners):
print("Select planner:")
for i, p in enumerate(planners):
print(f"{i}. {p}")
print("Enter number:")
while True:
try:
planner_index = int(input())
if planner_index < 0 or planner_index >= len(planners):
raise ValueError
break
except ValueError:
print("Invalid input, try again")
return planners[planner_index]
def solve_problem(problem, planner_name):
applicable_planners = get_all_applicable_engines(problem.kind)
if len(applicable_planners) == 0:
print(f"No planners applicable to problem {problem.kind}")
exit(1)
if planner_name not in applicable_planners:
print(f"Planner {planner_name} not applicable to problem {problem.kind}")
if len(applicable_planners) == 1:
planner_name = applicable_planners[0]
print(f"Using {applicable_planners[0]}")
else:
planner_name = select_planner(applicable_planners)
print(f"Using {planner_name}")
planner = OneshotPlanner(name=planner_name)
return planner.solve(problem)
def launch_server(problem, result, host, port):
server = PdSimUnityServer(problem, result, host, port)
server.server_loop()
def pdsim_pddl(domain_path, problem_path, planner_name, host='127.0.0.1', port='5556'):
try:
print("Parsing domain")
problem_pddl = PDDLReader().parse_problem(domain_path, problem_path)
print("Parsing Complete")
problem_pddl = compile_problem(problem_pddl)
except Exception as exception:
print("Error parsing problem")
print(exception)
exit(1)
try:
result = solve_problem(problem_pddl, planner_name)
if result.plan is None:
print("No plan found")
exit(1)
except Exception as exception:
print("Error solving problem")
print(exception)
exit(1)
launch_server(problem_pddl, result, host, port)
def pdsim_upf(problem_upf, planner_name, host='127.0.0.1', port='5556'):
up.shortcuts.get_environment().credits_stream = None
try:
result = solve_problem(problem_upf, planner_name)
if result.plan is None:
print("No plan found")
exit(1)
except Exception as exception:
print(exception)
exit(1)
launch_server(problem_upf, result, host, port)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='PDSim server')
parser.add_argument('--domain', type=str, help='Domain file')
parser.add_argument('--problem', type=str, help='Problem file')
parser.add_argument('--planner', type=str, default='fast-downward', help='Planner name')
parser.add_argument('--host', type=str, default='127.0.0.1', help='Host address')
parser.add_argument('--port', type=str, default='5556', help='Port')
args = parser.parse_args()
# error checking args
if args.domain is None or args.problem is None:
usage()
exit(1)
else:
# run server
pdsim_pddl(args.domain, args.problem, args.planner, args.host, args.port)