At the moment, I keep getting this error:
org.springframework.messaging.MessagingException
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:433)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:54)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.UncheckedIOException: IOException when retrieving /to_erp/orders/49230084_COMPLETED_ORDER.json
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:249)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:228)
at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:220)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:450)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
... 13 more
Caused by: SFTP error (SSH_FX_NO_SUCH_FILE): The file does not exist.
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.throwStatusException(AbstractSftpClient.java:277)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandleResponse(AbstractSftpClient.java:299)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandle(AbstractSftpClient.java:290)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.open(AbstractSftpClient.java:589)
at org.apache.sshd.sftp.client.impl.SftpInputStreamAsync.<init>(SftpInputStreamAsync.java:75)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.read(AbstractSftpClient.java:1196)
at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:909)
at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:905)
at org.springframework.integration.sftp.session.SftpSession.readRaw(SftpSession.java:125)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:296)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:239)
... 19 more
But this happens even after using the SftpPersistentAcceptOnceFileListFilter
. Here is how I configure my subscribers.
@Profile("!test")
@Component
public class CreatedOrderSubscriber {
private static final String REMOTE_DIRECTORY = "/to_wms/orders/";
private static final String FILE_PREFIX = "created_order_stream_";
private static final int MAX_FETCH_SIZE = 50;
private final ConcurrentMetadataStore metadataStore;
private final OilOrderCreatedService oilOrderCreatedService;
private final SessionFactory<DirEntry> sftpSessionFactory;
private final AdviceFactory adviceChainFactory;
public CreatedOrderSubscriber(
ConcurrentMetadataStore metadataStore,
AdviceFactory adviceChainFactory,
OilOrderCreatedService oilOrderCreatedService,
@Qualifier("wmsSftpSessionFactory") SessionFactory<DirEntry> sftpSessionFactory) {
this.metadataStore = metadataStore;
this.oilOrderCreatedService = oilOrderCreatedService;
this.sftpSessionFactory = sftpSessionFactory;
this.adviceChainFactory = adviceChainFactory;
}
@Bean
@ServiceActivator(inputChannel = "createdOrderV8Data", adviceChain = "afterCreatedOrder")
public MessageHandler handleCreatedOrder() {
var callback = createMessageHandlerCallback();
return MessageHandlerFactory.createHandler(callback, OrderUpdateHeaderDto.class);
}
@Bean
@InboundChannelAdapter(channel = "createdOrderV8Stream", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "10"))
public MessageSource<InputStream> createdOrderFtpMessageSource(SftpRemoteFileTemplate createdOrderV8Template) {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(createdOrderV8Template);
messageSource.setRemoteDirectory(REMOTE_DIRECTORY);
messageSource.setFilter(fileListFilter());
messageSource.setMaxFetchSize(MAX_FETCH_SIZE);
return messageSource;
}
@Bean
@Transformer(inputChannel = "createdOrderV8Stream", outputChannel = "createdOrderV8Data")
public org.springframework.integration.transformer.Transformer createdOrderTransformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice afterCreatedOrder() {
return adviceChainFactory.createAdviceWithRemovalUponSuccess("createdOrderV8Template");
}
@Bean
public SftpRemoteFileTemplate createdOrderV8Template() {
return new SftpRemoteFileTemplate(sftpSessionFactory);
}
private Function<OrderUpdateHeaderDto, Void> createMessageHandlerCallback() {
return dto -> {
oilOrderCreatedService.handleMessage(dto);
return null;
};
}
private ChainFileListFilter<DirEntry> fileListFilter() {
var sftpPersistentAcceptOnceFilter = new SftpPersistentAcceptOnceFileListFilter(metadataStore, FILE_PREFIX);
var nameFilter = new SftpSimplePatternFileListFilter("*_NEW_ORDER.json");
var filters = Set.of(nameFilter, sftpPersistentAcceptOnceFilter);
return new ChainFileListFilter<>(filters);
}
}
Files are processed and then removed. What seems to happen is that it keeps trying to process files that are ingested by another app replica. I'm guessing that I misconfigured the accept once filter, but I simply do not know how to fix it.
The metadata store is defined like this:
@Profile("!test")
@Configuration
public class MetadataStoreConfig {
@Bean
public ConcurrentMetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
}
How to properly set up SFTP file subscribers so that they can be used in multiple app replica's without interfering with each other?
I tried to use the SftpPersistentAcceptOnceFileListFilter
. This persists a record in a metadata store that is supposed to act like a sort of lock, so that files processing can happen from multiple app instances without affecting each other. So far, this did not work (probably due to misconfiguration somewhere).
Alternatively, I can only make one instance read from the SFTP server, but I do not like the additional complexity that this solution would give. Reading from multiple app replicas should be possible in Spring Boot.
At the moment, I keep getting this error:
org.springframework.messaging.MessagingException
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:433)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:54)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.UncheckedIOException: IOException when retrieving /to_erp/orders/49230084_COMPLETED_ORDER.json
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:249)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:228)
at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:220)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:450)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
... 13 more
Caused by: SFTP error (SSH_FX_NO_SUCH_FILE): The file does not exist.
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.throwStatusException(AbstractSftpClient.java:277)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandleResponse(AbstractSftpClient.java:299)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandle(AbstractSftpClient.java:290)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.open(AbstractSftpClient.java:589)
at org.apache.sshd.sftp.client.impl.SftpInputStreamAsync.<init>(SftpInputStreamAsync.java:75)
at org.apache.sshd.sftp.client.impl.AbstractSftpClient.read(AbstractSftpClient.java:1196)
at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:909)
at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:905)
at org.springframework.integration.sftp.session.SftpSession.readRaw(SftpSession.java:125)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:296)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:239)
... 19 more
But this happens even after using the SftpPersistentAcceptOnceFileListFilter
. Here is how I configure my subscribers.
@Profile("!test")
@Component
public class CreatedOrderSubscriber {
private static final String REMOTE_DIRECTORY = "/to_wms/orders/";
private static final String FILE_PREFIX = "created_order_stream_";
private static final int MAX_FETCH_SIZE = 50;
private final ConcurrentMetadataStore metadataStore;
private final OilOrderCreatedService oilOrderCreatedService;
private final SessionFactory<DirEntry> sftpSessionFactory;
private final AdviceFactory adviceChainFactory;
public CreatedOrderSubscriber(
ConcurrentMetadataStore metadataStore,
AdviceFactory adviceChainFactory,
OilOrderCreatedService oilOrderCreatedService,
@Qualifier("wmsSftpSessionFactory") SessionFactory<DirEntry> sftpSessionFactory) {
this.metadataStore = metadataStore;
this.oilOrderCreatedService = oilOrderCreatedService;
this.sftpSessionFactory = sftpSessionFactory;
this.adviceChainFactory = adviceChainFactory;
}
@Bean
@ServiceActivator(inputChannel = "createdOrderV8Data", adviceChain = "afterCreatedOrder")
public MessageHandler handleCreatedOrder() {
var callback = createMessageHandlerCallback();
return MessageHandlerFactory.createHandler(callback, OrderUpdateHeaderDto.class);
}
@Bean
@InboundChannelAdapter(channel = "createdOrderV8Stream", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "10"))
public MessageSource<InputStream> createdOrderFtpMessageSource(SftpRemoteFileTemplate createdOrderV8Template) {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(createdOrderV8Template);
messageSource.setRemoteDirectory(REMOTE_DIRECTORY);
messageSource.setFilter(fileListFilter());
messageSource.setMaxFetchSize(MAX_FETCH_SIZE);
return messageSource;
}
@Bean
@Transformer(inputChannel = "createdOrderV8Stream", outputChannel = "createdOrderV8Data")
public org.springframework.integration.transformer.Transformer createdOrderTransformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice afterCreatedOrder() {
return adviceChainFactory.createAdviceWithRemovalUponSuccess("createdOrderV8Template");
}
@Bean
public SftpRemoteFileTemplate createdOrderV8Template() {
return new SftpRemoteFileTemplate(sftpSessionFactory);
}
private Function<OrderUpdateHeaderDto, Void> createMessageHandlerCallback() {
return dto -> {
oilOrderCreatedService.handleMessage(dto);
return null;
};
}
private ChainFileListFilter<DirEntry> fileListFilter() {
var sftpPersistentAcceptOnceFilter = new SftpPersistentAcceptOnceFileListFilter(metadataStore, FILE_PREFIX);
var nameFilter = new SftpSimplePatternFileListFilter("*_NEW_ORDER.json");
var filters = Set.of(nameFilter, sftpPersistentAcceptOnceFilter);
return new ChainFileListFilter<>(filters);
}
}
Files are processed and then removed. What seems to happen is that it keeps trying to process files that are ingested by another app replica. I'm guessing that I misconfigured the accept once filter, but I simply do not know how to fix it.
The metadata store is defined like this:
@Profile("!test")
@Configuration
public class MetadataStoreConfig {
@Bean
public ConcurrentMetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
}
How to properly set up SFTP file subscribers so that they can be used in multiple app replica's without interfering with each other?
I tried to use the SftpPersistentAcceptOnceFileListFilter
. This persists a record in a metadata store that is supposed to act like a sort of lock, so that files processing can happen from multiple app instances without affecting each other. So far, this did not work (probably due to misconfiguration somewhere).
Alternatively, I can only make one instance read from the SFTP server, but I do not like the additional complexity that this solution would give. Reading from multiple app replicas should be possible in Spring Boot.
According to your question we don't know what ConcurrentMetadataStore
you use.
In case of several instances of your application, the MetadataStore
has to be shared, persistent: https://docs.spring.io/spring-integration/reference/meta-data-store.html.
You also may need to look into a ChainFileListFilter
instead. And reorder your filters the way that nameFilter
is consulted first. This one is not statefull, so won't be any storage effect if file name does not match.
UPDATE
See SftpStreamingMessageSource.setMaxFetchSize()
(right, you use it already):
/**
* Set the maximum number of objects the source should fetch if it is necessary to
* fetch objects. Setting the
* maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
* @param maxFetchSize the max fetch size; a negative value means unlimited.
*/
@ManagedAttribute(description = "Maximum objects to fetch")
void setMaxFetchSize(int maxFetchSize);
See also more docs about this option behavior: https://docs.spring.io/spring-integration/reference/sftp/max-fetch.html#page-title.
So, apparently you got a situation when remote files are fetched first, then emitted after filtering. One of the instances removes the file, and another one tries to fetch it from its cache.
See if setting that option to 1
helps somehow.
UPDATE
Here is some sample which demonstrates the feature working well: https://github.com/artembilan/sandbox/tree/master/so-79334692