Kinexis is a lightweight Spring Boot library for implementing cache-aside, write-behind, and refresh-ahead data flows with Redis Streams.
The core idea is simple: application code keeps using ordinary services and repositories, while Kinexis coordinates cache reads, cache writes, durable stream events, asynchronous persistence, retry, and dead-letter recovery.
Kinexis is designed around explicit store adapters, not around a specific database technology. A store can be backed by JPA, MongoDB, Redis OM Spring, another Spring Data repository, or a custom implementation. The runtime works through EntityStore<T> and CacheStore<T> so the processing model stays platform-agnostic.
Kinexis provides:
| Capability | What Kinexis does |
|---|---|
| Cache-aside | Reads from cache first, falls back to a primary store, then repopulates the cache. |
| Write-behind | Publishes save/delete events to Redis Streams and persists to one or more backing stores asynchronously. |
| Refresh-ahead | Uses Redis key expiration events to refresh cached data when TTL-based cache entries expire. |
| Explicit store routing | Lets you register named stores and target groups such as mysql, mongo, primary, or archive. |
| Parallel fan-out | Writes one stream event to multiple target stores concurrently through a bounded executor. |
| Retry and DLQ | Retries pending stream records and moves exhausted failures to a dead-letter stream with structured metadata. |
| DLQ replay | Replays dead-letter records to the original stream, optionally overriding target stores. |
| Startup validation | Fails fast for missing stores, invalid parallelism, duplicate store names, and ambiguous target aliases. |
| Diagnostics | Exposes resolved entity/store metadata through KinexisDiagnosticsService. |
Kinexis does not add an HTTP diagnostics endpoint. The library exposes a service API so applications can decide whether to wire that into Actuator, an internal controller, logs, tests, or another operational surface.
For an annotated entity, Kinexis generates entity-specific infrastructure at compile time:
- a Redis OM repository for the cache representation
- a Redis Stream listener
- a stream processor
- a pending-message handler
- a lightweight
KinexisEntityRegistrycomponent - a key-expiration listener when refresh-ahead is enabled with a positive TTL
At runtime, the flow is:
- Your application calls a method inherited from
KinexisService<T>. - Reads consult
EntityStoreRegistryfor aCacheStore<T>and primaryEntityStore<T>. - Write-behind saves and deletes append versioned events to Redis Streams.
- Generated stream listeners read those events by entity-specific consumer group.
- Processors resolve target stores from
EntityStoreRegistry. - Target stores are invoked concurrently up to
kinexis.processing.max-parallel-stores. - Failed processing stays pending, is retried, and eventually moves to a dead-letter stream.
Add kinexis-core to your Spring Boot application.
<dependency>
<groupId>io.github.foogaro</groupId>
<artifactId>kinexis-core</artifactId>
<version>1.0.1</version>
</dependency>Import Kinexis configuration in your Spring Boot application:
import com.foogaro.kinexis.core.config.KinexisConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@EnableRedisRepositories(basePackages = "com.example")
@Import(KinexisConfiguration.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}Configure Redis and Kinexis:
spring.data.redis.host=localhost
spring.data.redis.port=6379
kinexis.stream.poll-timeout=1s
kinexis.stream.batch-size=100
kinexis.stream.listener.pending.max-attempts=3
kinexis.stream.listener.pending.max-retention=120000
kinexis.stream.listener.pending.batch-size=50
kinexis.stream.listener.pending.fixed-delay=300000
kinexis.processing.max-parallel-stores=4
kinexis.validation.enabled=true
kinexis.validation.fail-fast=true
kinexis.stores.repository-discovery.enabled=falseThe jar includes Spring Boot configuration metadata for the kinexis.* namespace, so IDEs can autocomplete these properties without extra runtime dependencies.
Use @CachingPatterns on the entity class.
import com.foogaro.kinexis.core.annotation.CachingPatterns;
import com.foogaro.kinexis.core.model.CachingFormat;
import com.foogaro.kinexis.core.model.CachingPattern;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
@Entity
@CachingPatterns(
format = CachingFormat.JSON,
patterns = {
CachingPattern.CACHE_ASIDE,
CachingPattern.WRITE_BEHIND,
CachingPattern.REFRESH_AHEAD
},
ttl = 300
)
public class Employer {
@Id
private Long id;
private String name;
private String email;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}@CachingPatterns options:
| Option | Meaning |
|---|---|
patterns |
Selects CACHE_ASIDE, WRITE_BEHIND, REFRESH_AHEAD, or NONE. |
format |
Selects Redis cache format: JSON or HASH. |
ttl |
TTL in seconds for cache writes. A value less than or equal to zero means no expiration. |
enabled |
If false, KinexisService bypasses cache and stream behavior and delegates to the primary store. |
The annotation processor expects an ID field annotated with jakarta.persistence.Id or javax.persistence.Id.
Application services extend KinexisService<T>.
import com.foogaro.kinexis.core.service.KinexisService;
import org.springframework.stereotype.Service;
@Service
public class EmployerService extends KinexisService<Employer> {
private final EmployerMysqlRepository repository;
public EmployerService(EmployerMysqlRepository repository) {
this.repository = repository;
}
public List<Employer> findAll() {
return repository.findAll();
}
}Inherited methods:
Optional<Employer> found = employerService.findById(42L);
Employer employer = new Employer();
employer.setId(42L);
employer.setName("Ada Lovelace");
employerService.save(employer);
employerService.update(employer);
employerService.delete(42L);For write-behind entities, save, update, and delete publish stream events instead of directly calling the database. For cache-aside entities, findById reads from cache first and then falls back to the primary store.
Custom repository queries remain your responsibility:
public Employer findByEmail(String email) {
return repository.findByEmail(email);
}Kinexis resolves stores through EntityStoreRegistry. The standard path is to define explicit EntityStore<T> and CacheStore<T> beans.
Repository-name discovery is deprecated and disabled by default. Use it only as a migration bridge:
kinexis.stores.repository-discovery.enabled=trueUse CrudRepositoryEntityStore for a Spring Data repository that should receive durable writes.
import com.foogaro.kinexis.core.service.BeanFinder;
import com.foogaro.kinexis.core.store.CrudRepositoryEntityStore;
import com.foogaro.kinexis.core.store.EntityStore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
class EmployerStores {
@Bean
EntityStore<Employer> mysqlEmployerStore(
EmployerMysqlRepository repository,
BeanFinder beanFinder
) {
return CrudRepositoryEntityStore
.builder(Employer.class, repository, beanFinder)
.name("mysqlEmployerStore")
.targets("mysql", "primary")
.build();
}
}Use RedisOmCacheStore for Redis OM Spring repositories. It honors TTL by expiring the resolved Redis entity key after save.
import com.foogaro.kinexis.core.service.BeanFinder;
import com.foogaro.kinexis.core.store.CacheStore;
import com.foogaro.kinexis.core.store.RedisOmCacheStore;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
@Bean
CacheStore<Employer> redisEmployerCache(
EmployerRedisRepository repository,
BeanFinder beanFinder,
RedisTemplate<String, String> redisTemplate
) {
return RedisOmCacheStore
.builder(Employer.class, repository, beanFinder)
.name("redisEmployerCache")
.targets("redis", "cache")
.redisTemplate(redisTemplate)
.build();
}Use CrudRepositoryCacheStore when the cache repository is Spring Data compatible but not Redis OM specific.
@Bean
CacheStore<Employer> genericCacheStore(
EmployerCacheRepository repository,
BeanFinder beanFinder
) {
return CrudRepositoryCacheStore
.builder(Employer.class, repository, beanFinder)
.name("genericEmployerCache")
.targets("cache")
.build();
}CrudRepositoryCacheStore ignores TTL by default. Any CacheStore<T> can support TTL by overriding:
default T save(T entity, Duration ttl) {
return save(entity);
}Implement EntityStore<T> for any platform.
import com.foogaro.kinexis.core.store.EntityStore;
public class HttpEntityStore implements EntityStore<Employer> {
@Override
public String name() {
return "httpEmployerStore";
}
@Override
public Class<Employer> entityType() {
return Employer.class;
}
@Override
public Set<String> targets() {
return Set.of("http", "archive");
}
@Override
public Optional<Employer> findById(Object id) {
return Optional.empty();
}
@Override
public Employer save(Employer entity) {
return entity;
}
@Override
public void deleteById(Object id) {
}
}Implement CacheStore<T> when the store should be used as the cache.
public class LocalCacheStore implements CacheStore<Employer> {
// same EntityStore methods, plus optional TTL support
}Write-behind events can target all backing stores or a selected group.
employerService.save(employer); // all target stores
employerService.save(employer, "primary"); // stores tagged primary
employerService.save(employer, "mysql"); // stores tagged mysql
employerService.delete(42L, "archive"); // stores tagged archiveKinexisService validates selected targets before publishing a stream event. If no configured store matches the requested target, it throws an IllegalArgumentException and does not append to Redis Streams.
If an invalid event is appended directly to Redis Streams, the processor fails the message and leaves it pending for retry/DLQ handling.
One stream event can write to multiple stores. Kinexis invokes matching target stores concurrently through a bounded executor.
kinexis.processing.max-parallel-stores=4If one or more stores fail, the processor raises ProcessMessageException with the failed store names. The pending-message handler records the failure metadata in the dead-letter stream when retry attempts are exhausted.
Startup validation is enabled by default.
kinexis.validation.enabled=true
kinexis.validation.fail-fast=trueValidation checks:
kinexis.processing.max-parallel-stores >= 1- cache patterns have a configured
CacheStore - cache-aside and refresh-ahead have a configured primary
EntityStore - write-behind has at least one target
EntityStore - disabled entities have a primary
EntityStorefor direct delegation - store names are unique per entity
- target aliases are not ambiguous across stores for the same entity
Validation warnings:
REFRESH_AHEADis configured without a positive TTL- deprecated repository discovery is enabled
Entity discovery for validation comes from:
- generated
KinexisEntityRegistrycomponents - explicit store beans
- generated processors
KinexisServicebeans
Inject KinexisDiagnosticsService to inspect the runtime configuration.
import com.foogaro.kinexis.core.service.KinexisDiagnosticsService;
@Service
public class KinexisAdminService {
private final KinexisDiagnosticsService diagnosticsService;
public KinexisAdminService(KinexisDiagnosticsService diagnosticsService) {
this.diagnosticsService = diagnosticsService;
}
public List<KinexisDiagnosticsService.EntityDiagnostics> stores() {
return diagnosticsService.stores();
}
}Each EntityDiagnostics contains:
- entity class and name
- whether the entity is annotated
- whether Kinexis is enabled for the entity
- configured caching patterns
- TTL
- resolved cache store
- resolved primary store
- target stores
- all known stores for duplicate/ambiguity checks
Pending records are retried using Redis Stream pending-entry-list state. When a message exceeds the configured attempt limit, Kinexis writes it to a dead-letter stream with structured metadata.
DLQ fields include:
reasonerrorattemptsfailedStoreexceptionClassfailureTimestamp- original stream key
- original stream ID
- original consumer group
- original consumer
Replay a dead-letter record:
import com.foogaro.kinexis.core.service.KinexisDlqService;
Optional<String> replayedId = kinexisDlqService.replay(
Employer.class,
dlqRecordId
);Replay to selected targets:
kinexisDlqService.replay(Employer.class, dlqRecordId, "mysql", "primary");Replay and delete the DLQ record after successful append:
kinexisDlqService.replay(
Employer.class,
dlqRecordId,
KinexisDlqService.ReplayMode.REPLAY_AND_DELETE,
"mysql"
);| Property | Default | Description |
|---|---|---|
kinexis.stream.poll-timeout |
1s |
Redis Stream poll timeout. |
kinexis.stream.batch-size |
100 |
Records read per stream poll. |
kinexis.stream.listener.pending.max-attempts |
3 |
Attempts before DLQ. |
kinexis.stream.listener.pending.max-retention |
120000 |
Pending retention threshold in milliseconds. |
kinexis.stream.listener.pending.batch-size |
50 |
Pending records inspected per scan. |
kinexis.stream.listener.pending.fixed-delay |
300000 |
Scheduler delay for pending scans in milliseconds. |
kinexis.processing.max-parallel-stores |
available processors, minimum 2 | Maximum concurrent target-store calls per stream record. |
kinexis.validation.enabled |
true |
Enables startup validation. |
kinexis.validation.fail-fast |
true |
Fails startup when validation errors exist. |
kinexis.stores.repository-discovery.enabled |
false |
Enables deprecated repository-name discovery. |
For an entity named Employer, generated code is placed under an entity-specific package such as:
com.example.entity.employer.repository.EmployerRedisRepository
com.example.entity.employer.listener.EmployerStreamListener
com.example.entity.employer.processor.EmployerProcessor
com.example.entity.employer.handler.EmployerPendingMessageHandler
com.example.entity.employer.EmployerKinexisEntityRegistry
Refresh-ahead with positive TTL also generates:
com.example.entity.employer.listener.EmployerKeyExpirationListener
The generated processor is entity-based, not repository-based. It asks EntityStoreRegistry for stores at runtime, which is what allows one event to fan out to several backing platforms.
Run the full test suite:
./mvnw clean testThe integration tests use Testcontainers with Redis and cover stream append, save, delete, failed processing, pending retry, DLQ, replay, TTL, explicit stores, repository discovery opt-in, diagnostics, validation, target routing, and parallel fan-out.
Kinexis intentionally keeps the public runtime small:
@CachingPatternsdescribes desired behavior.KinexisService<T>is the service base class used by application code.EntityStore<T>andCacheStore<T>make stores explicit and platform-agnostic.EntityStoreRegistryresolves stores for services and processors.KinexisDiagnosticsServiceexposes runtime metadata.KinexisDlqServicehandles operational replay.
The library uses Redis for cache and streams, but backing stores are intentionally not Redis-specific.
Kinexis is a personal project and is not affiliated with or endorsed by Redis Inc.