Persistence scheme for DelayQueue

As we mentioned in the last article, we used DelayQueue to solve some tasks that need to be delayed in our project, but recently we have a problem in the production environment. When the server is restarted, the delayed tasks that are not executed disappear. Therefore, how to delay the task persistence is put on the agenda.

The specific implementation of DelayQueue has been mentioned in the general component of DelayQueue, the last article. This article is no longer repeated.

The main topic of this issue is to explore how to persist a delayed task.

What is the persistence of delayed tasks? As the name suggests, the execution of these delayed tasks requires the necessary data to be stored in a database or redis.

So why should we persist? Very simple, because the delay task data is placed in memory, so you need to record our own writing persistence to achieve high availability, or server failure downtime or new release caused when a server restart, then those who did not perform the task of data delay will be completely lost, this is obviously we don’t want to see.

As I currently used in 1 schemes:
, need to use DelayQueue, call the saveDelayTask method, the parameters required routing tag delay task function strategy of the factory class, parameters of messageBody execution method required JSON format, how long the delay of execution in seconds for delayTime.
2, task scheduler, executes the getNotCompletedMessageList method every 15 seconds.

In most cases, in the expected execution time on time to execute the processTask method, then the abnormal situation, if the server restart, then the timed task scheduling will find those not scheduled execution delay task after a certain period of time, the processTask method through the timing of task scheduling in order to perform their respective tasks.
abnormal state, delayed task execution than expected execution time is delayed, I design is the scope of the present we can allow this, we can postpone the appropriate set of alternative time.

The core code is as follows. Other code is simple, not published.

Public void saveDelayTask (String tag, String messageBody, Long delayTime) {DelayTaskMessage delayTaskMessage = new (DelayTaskMessage); delayTaskMessage.setTag (tag); LocalDateTime now = LocalDateTime.now (delayTaskMessage.setCreateTime); (now); delayTaskMessage.setUpdateTime (now); delayTaskMessage.setDelayTime (delayTime); delayTaskMessage.setExpectedTime (now.plusSeconds (delayTime)); delayTaskMessage.setMessageBody (delayTaskMessage.setStatus (messageBody); KafkaMessageStatusEnum.NOT_COMPLETE.getCode (int)); res = delayTaskMessageMapper.insertDelayTaskMessage (delayTaskMessage); if (RES < = 0) {log.error ("ybBrokerApp|insertDelayTaskMessage error res< =0 throw new RuntimeExcept"); Ion (insertDelayTaskMessage error, res< =0 TaskMessage taskMessage;}) = new TaskMessage (delayTime * 1000, messageBody, function -> this.processTask (delayTaskMessage)); DelayQueue< TaskMessage> queue = taskManager.getQueue (queue.offer); (taskMessage);}

First, analyze the saveDelayTask method used to hold the delayed task.

Tag is the mark of a delayed task, which is used to specify the corresponding policy class.

MessageBody is primarily used to store some of the necessary data for performing a delayed task and store it in the JSON method.

DelayTime is the delay time, the default is s, mainly for ease of use.

The main function of this method is that the delay task execution is not the first save, automatically according to the expected execution time delay time delay calculation of the task, in order to facilitate the subsequent tracking compensation algorithm, and then use the characteristics of DelayQueue, this will be submitted to the delay queue delay task execution.

Public int processTask (DelayTaskMessage param) {DelayTaskMessage delayTaskMessage = delayTaskMessageMapper.getDelayTaskMessageById (param.getId ()); try {if (null! = delayTaskMessage & & Objects.equals (delayTaskMessage.getStatus), (!) (KafkaMessageStatusEnum.NOT_COMPLETE.getCode)) {log.info ("processTask executed already"); return 1;} if (null! = delayTaskMessage) {DelayTaskExecuteProcessor processor = processorFactory.getExecuteProcessor (delayTaskMessage.getTag ()); if (processor! = null) {processor.execute} {(delayTaskMessage); else throw new RuntimeException ("no such processor, tag= + delayTaskMessage.getT AG (delayTaskMessage.setStatus));} ((KafkaMessageStatusEnum.COMPLETE.getCode)); delayTaskMessage.setExecutionTime ((LocalDateTime.now)); try (InetAddress.getLocalHost) {delayTaskMessage.setIpAddress ((.GetHostAddress));} catch (UnknownHostException Ex) {log.error ("Address.getLocalHost error", ex);} int res = delayTaskMessageMapper.updateDelayTaskMessageStatus (delayTaskMessage); if (RES = < 0) {log.error ("updateDelayTaskMessageStatus error res< =0 throw new"); RuntimeException ("updateDelayTaskMessageStatus error");} else {return 1;} Log.error ("ybBrokerApp processTask error, delayTaskMessage is null delayTaskMessageId=", param.getId (0)); return;}} catch (Exception E) {log.error ("ybBrokerApp processTask error param (param.toString) =" + + "|", e); if (null! = delayTaskMessage) {delayTaskMessage.setStatus (KafkaMessageStatusEnum.FAIL.getCode () (()); delayTaskMessage.setErrorStack e.getMessage; try) {delayTaskMessage.setIpAddress (InetAddress.getLocalHost) (.GetHostAddress ());} catch (UnknownHostException Ex) {log.error ("Address.getLocalHost error", ex delayTaskMessageMapper.updateDelayTaskMessageStatus (delayTaskMessa);} GE)} return 0;}

Then it’s the kernel’s processTask approach to processing latency tasks.

1, according to ID, in the database to find the corresponding delay task to perform persistent data.
2, if this persisted data is not empty and the state is not an out of execution state, then the task has been executed to prevent repeated execution. The status here has three main states, not executed, executed successfully, and failed to execute.
3, if this persisted data is not empty and is not executed, find the corresponding policy class of the tag to execute the corresponding execute method.
4, which records the IP address of the execution method, facilitates subsequent analysis, and changes the status of this persistent data to the state of successful execution.
5, if the execution fails, record the IP address and change the status of the data to execution failure.

In the fourth step, why do you have to implement the method first and then change the status? I think so, implementation methods of delay task is the external write, when the component design cannot control will not appear abnormal, and the modification method of data persistence state is controllable, so from this perspective, I think the first implementation method to modify the state of something more reasonable.

It should be noted that this method does not take into account concurrency, because I am in the compensation scheme and the additional delay for a period of time, the occurrence of concurrency is very small, it is not necessary to consider this situation.

Public List< DelayTaskMessage> getNotCompletedMessageList (int total, int index) {LocalDateTime = LocalDateTime.now (expectedTime).PlusSeconds (15L); List< DelayTaskMessage> delayTaskMessageList = delayTaskMessageMapper.getNotCompletedMessageList (expectedTime, total, index); if (CollectionUtils.isEmpty (delayTaskMessageList)) {return} (Lists.newArrayList); return delayTaskMessageList;}

Finally, the implementation of the compensation scheme, I am in a regular task to ensure that the delay will be carried out at least once.

My design is to go every 15s to traverse the delayed tasks that are still outstanding after the expected execution time of +15s. The processTask method is then called again by the latency task in these lists.

If the final delay is executed by the compensation scheme, the task will be later than the expected execution time, and 15 to 30s will be executed. At present, this additional delay is acceptable in our project. The extra delay will be modified according to the actual circumstances.

The above is my persistence plan for DelayQueue design. If we have better opinions, we can discuss it together.