Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang committed Jan 17, 2025
1 parent 68ffa3f commit d49f500
Show file tree
Hide file tree
Showing 33 changed files with 718 additions and 297 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.config;

import org.apache.seatunnel.app.dal.entity.User;
import org.apache.seatunnel.app.security.UserContext;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncThread-");
executor.setTaskDecorator(new ContextCopyingDecorator());
executor.initialize();
return executor;
}

@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}

public static class ContextCopyingDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
try {
User user = UserContext.getUser();
return () -> {
try {
UserContext.setUser(user);
runnable.run();
} finally {
UserContext.clear();
}
};
} catch (Exception e) {
return runnable;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.config;

import org.apache.seatunnel.app.interceptor.UserContextInterceptor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
* Web MVC configuration for registering interceptors. This configuration ensures proper handling of
* user context in web requests.
*/
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

/**
* Creates and registers the UserContextInterceptor bean.
*
* @return A new instance of UserContextInterceptor
*/
@Bean
public UserContextInterceptor userContextInterceptor() {
return new UserContextInterceptor();
}

/**
* Configures the interceptors for the application. UserContextInterceptor is registered with a
* high order value to ensure it executes after the authentication interceptor.
*
* @param registry The interceptor registry
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(userContextInterceptor())
.addPathPatterns("/**")
.order(100); // High order value ensures execution after authentication
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -44,11 +43,10 @@ public class JobConfigController {
@PutMapping("/{jobVersionId}")
@ApiOperation(value = "update job config", httpMethod = "PUT")
Result<Void> updateJobConfig(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobVersionId", required = true) @PathVariable long jobVersionId,
@ApiParam(value = "jobConfig", required = true) @RequestBody JobConfig jobConfig)
throws JsonProcessingException {
jobConfigService.updateJobConfig(userId, jobVersionId, jobConfig);
jobConfigService.updateJobConfig(jobVersionId, jobConfig);
return Result.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -47,33 +46,29 @@ public class JobController {
value =
"Create a job, In jobDAG for inputPluginId and targetPluginId use the plugin names instead of ids.",
httpMethod = "POST")
public Result<Long> createJob(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@RequestBody JobCreateReq jobCreateRequest)
public Result<Long> createJob(@RequestBody JobCreateReq jobCreateRequest)
throws JsonProcessingException {
return Result.success(jobCRUDService.createJob(userId, jobCreateRequest));
return Result.success(jobCRUDService.createJob(jobCreateRequest));
}

@PutMapping("/update/{jobVersionId}")
@ApiOperation(
value = "Update a job, all the existing ids should be passed in the request.",
httpMethod = "PUT")
public Result<Void> updateJob(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobVersionId", required = true) @PathVariable long jobVersionId,
@RequestBody JobCreateReq jobCreateReq)
throws JsonProcessingException {
jobCRUDService.updateJob(userId, jobVersionId, jobCreateReq);
jobCRUDService.updateJob(jobVersionId, jobCreateReq);
return Result.success();
}

@GetMapping("/get/{jobVersionId}")
@ApiOperation(value = "Get a job detail.", httpMethod = "GET")
public Result<JobRes> getJob(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobVersionId", required = true) @PathVariable long jobVersionId)
throws JsonProcessingException {
JobRes jobRes = jobCRUDService.getJob(userId, jobVersionId);
JobRes jobRes = jobCRUDService.getJob(jobVersionId);
return Result.success(jobRes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand All @@ -57,12 +56,10 @@ public class JobDefinitionController {
*/
@PostMapping
@ApiOperation(value = "create job definition", httpMethod = "POST")
Result<Long> createJobDefinition(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@RequestBody JobReq jobReq)
Result<Long> createJobDefinition(@RequestBody JobReq jobReq)
throws CodeGenerateUtils.CodeGenerateException {
if (jobService.getJob(jobReq.getName()).isEmpty()) {
return Result.success(jobService.createJob(userId, jobReq));
return Result.success(jobService.createJob(jobReq));
} else {
return Result.failure(SeatunnelErrorEnum.TASK_NAME_ALREADY_EXISTS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand All @@ -57,22 +56,20 @@ public class JobExecutorController {
@PostMapping("/execute")
@ApiOperation(value = "Execute synchronization tasks", httpMethod = "POST")
public Result<Long> jobExecutor(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobDefineId", required = true) @RequestParam("jobDefineId")
Long jobDefineId,
@RequestBody(required = false) JobExecParam executeParam) {
return jobExecutorService.jobExecute(userId, jobDefineId, executeParam);
return jobExecutorService.jobExecute(jobDefineId, executeParam);
}

@GetMapping("/resource")
@ApiOperation(value = "get the resource for job executor", httpMethod = "GET")
public Result<JobExecutorRes> resource(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "Job define id", required = true) @RequestParam Long jobDefineId)
throws IOException {
try {
JobExecutorRes executeResource =
jobInstanceService.createExecuteResource(userId, jobDefineId, null);
jobInstanceService.createExecuteResource(jobDefineId, null);
return Result.success(executeResource);
} catch (Exception e) {
log.error("Get the resource for job executor error", e);
Expand All @@ -82,39 +79,34 @@ public Result<JobExecutorRes> resource(

@GetMapping("/pause")
public Result<Void> jobPause(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return jobExecutorService.jobPause(userId, jobInstanceId);
return jobExecutorService.jobPause(jobInstanceId);
}

@GetMapping("/restore")
public Result<Void> jobRestore(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return jobExecutorService.jobStore(userId, jobInstanceId);
return jobExecutorService.jobStore(jobInstanceId);
}

@GetMapping("/status")
@ApiOperation(value = "get job execution status", httpMethod = "GET")
Result<JobExecutionStatus> getJobExecutionStatus(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return taskInstanceService.getJobExecutionStatus(userId, jobInstanceId);
return taskInstanceService.getJobExecutionStatus(jobInstanceId);
}

@GetMapping("/detail")
@ApiOperation(value = "get job execution status and some more details", httpMethod = "GET")
Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return taskInstanceService.getJobExecutionDetail(userId, jobInstanceId);
return taskInstanceService.getJobExecutionDetail(jobInstanceId);
}

@DeleteMapping("/delete")
@ApiOperation(value = "Deletes given job instance id", httpMethod = "DELETE")
Result<Void> deleteJobInstance(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return taskInstanceService.deleteJobInstanceById(userId, jobInstanceId);
return taskInstanceService.deleteJobInstanceById(jobInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.seatunnel.app.service.IJobMetricsService;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -48,31 +47,25 @@ public class JobMetricsController {
@GetMapping("/detail")
@ApiOperation(value = "get the job pipeline detail metrics", httpMethod = "GET")
public Result<List<JobPipelineDetailMetricsRes>> detail(
@ApiParam(value = "userId", required = true) @RequestAttribute Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId)
throws IOException {

return Result.success(
jobMetricsService.getJobPipelineDetailMetricsRes(userId, jobInstanceId));
return Result.success(jobMetricsService.getJobPipelineDetailMetricsRes(jobInstanceId));
}

@GetMapping("/dag")
@ApiOperation(value = "get the job pipeline dag", httpMethod = "GET")
public Result<JobDAG> getJobDAG(
@ApiParam(value = "userId", required = true) @RequestAttribute Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId)
throws JsonProcessingException {

return Result.success(jobMetricsService.getJobDAG(userId, jobInstanceId));
return Result.success(jobMetricsService.getJobDAG(jobInstanceId));
}

@GetMapping("/summary")
@ApiOperation(value = "get the job pipeline summary metrics", httpMethod = "GET")
public Result<List<JobPipelineSummaryMetricsRes>> summary(
@ApiParam(value = "userId", required = true) @RequestAttribute Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId)
throws IOException {
return Result.success(
jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId));
return Result.success(jobMetricsService.getJobPipelineSummaryMetrics(jobInstanceId));
}
}
Loading

0 comments on commit d49f500

Please sign in to comment.