3
3
import com .fasterxml .jackson .core .type .TypeReference ;
4
4
import com .fasterxml .jackson .databind .JsonNode ;
5
5
import com .fasterxml .jackson .databind .ObjectMapper ;
6
+ import com .fasterxml .jackson .databind .node .ObjectNode ;
6
7
import com .google .common .base .Throwables ;
7
8
import com .google .common .collect .ImmutableList ;
9
+ import com .google .common .collect .Maps ;
8
10
import com .google .inject .Inject ;
9
11
import io .digdag .client .config .Config ;
10
- import io .digdag .client .config .ConfigException ;
12
+ import io .digdag .client .config .ConfigElement ;
11
13
import io .digdag .client .config .ConfigFactory ;
14
+ import io .digdag .spi .CommandContext ;
12
15
import io .digdag .spi .CommandExecutor ;
16
+ import io .digdag .spi .CommandRequest ;
17
+ import io .digdag .spi .CommandStatus ;
13
18
import io .digdag .spi .Operator ;
14
19
import io .digdag .spi .OperatorContext ;
15
20
import io .digdag .spi .OperatorFactory ;
16
- import io .digdag .spi .PrivilegedVariables ;
17
21
import io .digdag .spi .TaskExecutionException ;
18
22
import io .digdag .spi .TaskResult ;
19
23
import io .digdag .util .BaseOperator ;
24
+ import io .digdag .util .CommandOperators ;
20
25
import io .digdag .util .UserSecretTemplate ;
21
- import java .io .BufferedWriter ;
26
+ import java .io .ByteArrayOutputStream ;
22
27
import java .io .IOException ;
23
- import java .io .OutputStreamWriter ;
28
+ import java .io .PrintStream ;
24
29
import java .io .Writer ;
30
+ import java .nio .charset .StandardCharsets ;
31
+ import java .nio .file .Files ;
32
+ import java .nio .file .Path ;
33
+ import java .time .Duration ;
25
34
import java .util .ArrayList ;
26
35
import java .util .HashMap ;
27
36
import java .util .LinkedList ;
28
37
import java .util .List ;
29
38
import java .util .Map ;
30
- import java .util .regex .Pattern ;
31
- import org .apache .commons .io .IOUtils ;
32
39
import org .slf4j .Logger ;
33
40
import org .slf4j .LoggerFactory ;
34
41
@@ -37,8 +44,6 @@ public class ShResultOperatorFactory implements OperatorFactory {
37
44
38
45
private static Logger logger = LoggerFactory .getLogger (ShResultOperatorFactory .class );
39
46
40
- private static Pattern VALID_ENV_KEY = Pattern .compile ("[a-zA-Z_][a-zA-Z_0-9]*" );
41
-
42
47
private final CommandExecutor exec ;
43
48
44
49
@ Inject
@@ -58,93 +63,164 @@ public Operator newOperator(OperatorContext operatorContext) {
58
63
59
64
class ShResultOperator extends BaseOperator {
60
65
66
+ // TODO extract as config params.
67
+ final int scriptPollInterval = (int ) Duration .ofSeconds (10 ).getSeconds ();
68
+
61
69
public ShResultOperator (OperatorContext context ) {
62
70
super (context );
63
71
}
64
72
65
73
@ Override
66
74
public TaskResult runTask () {
67
- Config params = request .getConfig ()
75
+ final Config state = request .getConfig ();
76
+ Config params = state .mergeDefault (request .getConfig ().getNestedOrGetEmpty ("sh" ));
77
+
78
+ // save System.out
79
+ PrintStream console = System .out ;
80
+
81
+ try {
82
+ // prepare capture stdout
83
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream ();
84
+ final String utf8 = StandardCharsets .UTF_8 .name ();
85
+ PrintStream ps = new PrintStream (baos );
86
+ System .setOut (ps );
87
+
88
+ // run script
89
+ runCode (state );
90
+
91
+ // capture stdout
92
+ String stdoutData = baos .toString (utf8 );
93
+
94
+ String varName = params .get ("destination_variable" , String .class );
95
+ String stdoutFormat = params .get ("stdout_format" , String .class );
96
+
97
+ ConfigFactory cf = request .getConfig ().getFactory ();
98
+ Config storeParams = cf .create ();
99
+ storeParams .set (varName , createVariableObjectFromStdout (stdoutData , stdoutFormat ));
100
+
101
+ return TaskResult .defaultBuilder (request )
102
+ .storeParams (storeParams )
103
+ .build ();
104
+ } catch (IOException | InterruptedException e ) {
105
+ throw Throwables .propagate (e );
106
+ } finally {
107
+ // restore System.out
108
+ System .setOut (console );
109
+ }
110
+ }
111
+
112
+ private void runCode (final Config state )
113
+ throws IOException , InterruptedException {
114
+ final Config params = request .getConfig ()
68
115
.mergeDefault (request .getConfig ().getNestedOrGetEmpty ("sh" ));
116
+ final Path projectPath = workspace .getProjectPath ();
117
+ final CommandContext commandContext = buildCommandContext (projectPath );
118
+
119
+ final CommandStatus status ;
120
+ if (!state .has ("commandStatus" )) {
121
+ // Run the code since command state doesn't exist
122
+ status = runCommand (params , commandContext );
123
+ } else {
124
+ // Check the status of the running command
125
+ final ObjectNode previousStatusJson = state .get ("commandStatus" , ObjectNode .class );
126
+ status = exec .poll (commandContext , previousStatusJson );
127
+ }
69
128
70
- List <String > shell = params .getListOrEmpty ("shell" , String .class );
71
- if (shell .isEmpty ()) {
129
+ if (status .isFinished ()) {
130
+ final int statusCode = status .getStatusCode ();
131
+ if (statusCode != 0 ) {
132
+ // Remove the polling state after fetching the result so that the result fetch can be retried
133
+ // without resubmitting the code.
134
+ state .remove ("commandStatus" );
135
+ throw new RuntimeException ("Command failed with code " + statusCode );
136
+ }
137
+ return ;
138
+ } else {
139
+ state .set ("commandStatus" , status );
140
+ throw TaskExecutionException .ofNextPolling (scriptPollInterval , ConfigElement .copyOf (state ));
141
+ }
142
+ }
143
+
144
+ private CommandStatus runCommand (final Config params , final CommandContext commandContext )
145
+ throws IOException , InterruptedException {
146
+ final Path tempDir = workspace
147
+ .createTempDir (String .format ("digdag-sh-%d-" , request .getTaskId ()));
148
+ final Path workingDirectory = workspace .getPath (); // absolute
149
+ final Path runnerPath = tempDir .resolve ("runner.sh" ); // absolute
150
+
151
+ final List <String > shell ;
152
+ if (params .has ("shell" )) {
153
+ shell = params .getListOrEmpty ("shell" , String .class );
154
+ } else {
72
155
shell = ImmutableList .of ("/bin/sh" );
73
156
}
74
- String command = UserSecretTemplate .of (params .get ("_command" , String .class ))
75
- .format (context .getSecrets ());
76
157
77
- ProcessBuilder pb = new ProcessBuilder (shell );
78
- pb .directory (workspace .getPath ().toFile ());
158
+ final ImmutableList .Builder <String > cmdline = ImmutableList .builder ();
159
+ if (params .has ("shell" )) {
160
+ cmdline .addAll (shell );
161
+ } else {
162
+ cmdline .addAll (shell );
163
+ }
164
+ cmdline .add (workingDirectory .relativize (runnerPath ).toString ()); // relative
165
+
166
+ final String shScript = UserSecretTemplate .of (params .get ("_command" , String .class ))
167
+ .format (context .getSecrets ());
79
168
80
- final Map <String , String > env = pb . environment ();
169
+ final Map <String , String > environments = Maps . newHashMap ();
81
170
params .getKeys ()
82
171
.forEach (key -> {
83
- if (isValidEnvKey (key )) {
172
+ if (CommandOperators . isValidEnvKey (key )) {
84
173
JsonNode value = params .get (key , JsonNode .class );
85
174
String string ;
86
175
if (value .isTextual ()) {
87
176
string = value .textValue ();
88
177
} else {
89
178
string = value .toString ();
90
179
}
91
- env .put (key , string );
180
+ environments .put (key , string );
92
181
} else {
93
182
logger .trace ("Ignoring invalid env var key: {}" , key );
94
183
}
95
184
});
96
185
97
186
// Set up process environment according to env config. This can also refer to secrets.
98
- collectEnvironmentVariables (env , context .getPrivilegedVariables ());
99
-
100
- String stdoutData ;
101
- int ecode ;
102
- try {
103
- Process p = exec .start (workspace .getPath (), request , pb );
104
-
105
- // feed command to stdin
106
- try (Writer writer = new BufferedWriter (new OutputStreamWriter (p .getOutputStream ()))) {
107
- writer .write (command );
108
- }
109
-
110
- ecode = p .waitFor ();
111
-
112
- // keep stdout
113
- stdoutData = IOUtils .toString (p .getInputStream ());
114
-
115
- // dump stderr
116
- String stderrData = IOUtils .toString (p .getErrorStream ());
117
- logger .info (stderrData );
187
+ CommandOperators .collectEnvironmentVariables (environments , context .getPrivilegedVariables ());
118
188
119
- } catch (IOException | InterruptedException ex ) {
120
- throw Throwables .propagate (ex );
189
+ // Write script content to runnerPath
190
+ try (Writer writer = Files .newBufferedWriter (runnerPath )) {
191
+ writer .write (shScript );
121
192
}
122
193
123
- if (ecode != 0 ) {
124
- throw new TaskExecutionException ("Command failed with code " + ecode );
125
- }
126
-
127
- String varName = params .get ("destination_variable" , String .class );
128
- String stdoutFormat = params .get ("stdout_format" , String .class );
129
-
130
- ConfigFactory cf = request .getConfig ().getFactory ();
131
- Config storeParams = cf .create ();
194
+ final CommandRequest commandRequest = buildCommandRequest (commandContext , workingDirectory ,
195
+ tempDir , environments , cmdline .build ());
196
+ return exec .run (commandContext , commandRequest );
132
197
133
- storeParams .set (varName , createVariableObjectFromStdout (stdoutData , stdoutFormat ));
198
+ // TaskExecutionException could not be thrown here to poll the task by non-blocking for process-base
199
+ // command executor. Because they will be bounded by the _instance_ where the command was executed
200
+ // first.
201
+ }
134
202
135
- return TaskResult .defaultBuilder (request )
136
- .storeParams (storeParams )
203
+ private CommandContext buildCommandContext (final Path projectPath ) {
204
+ return CommandContext .builder ()
205
+ .localProjectPath (projectPath )
206
+ .taskRequest (this .request )
137
207
.build ();
138
208
}
139
- }
140
209
141
- public static void collectEnvironmentVariables (Map <String , String > env ,
142
- PrivilegedVariables variables ) {
143
- for (String name : variables .getKeys ()) {
144
- if (!VALID_ENV_KEY .matcher (name ).matches ()) {
145
- throw new ConfigException ("Invalid _env key name: " + name );
146
- }
147
- env .put (name , variables .get (name ));
210
+ private CommandRequest buildCommandRequest (final CommandContext commandContext ,
211
+ final Path workingDirectory ,
212
+ final Path tempDir ,
213
+ final Map <String , String > environments ,
214
+ final List <String > cmdline ) {
215
+ final Path projectPath = commandContext .getLocalProjectPath ();
216
+ final Path relativeWorkingDirectory = projectPath .relativize (workingDirectory ); // relative
217
+ final Path ioDirectory = projectPath .relativize (tempDir ); // relative
218
+ return CommandRequest .builder ()
219
+ .workingDirectory (relativeWorkingDirectory )
220
+ .environments (environments )
221
+ .commandLine (cmdline )
222
+ .ioDirectory (ioDirectory )
223
+ .build ();
148
224
}
149
225
}
150
226
@@ -190,9 +266,4 @@ public static Object createVariableObjectFromStdout(
190
266
}
191
267
return null ;
192
268
}
193
-
194
- private static boolean isValidEnvKey (String key ) {
195
- return VALID_ENV_KEY .matcher (key ).matches ();
196
- }
197
-
198
269
}
0 commit comments