It looks to me like you need some way to share the statement preparation across threads, as it can be used by any thread (and across log segments) once prepared. Probably easiest to do it during parsing of the log file.
We also have an issue with replay potentially over-parallelizing, and also potentially OOMing, as you're submitting straight to a thread pool after parsing each file. So there's nothing stopping us racing ahead and reading all of the log files (you have an unbounded queue), but since you submit each file separately you will spawn a thread/executor for each thread/segment combination, rather than each thread.
Probably we want to create some separate state to represent a thread, which we create once the first time we see a thread id, insert it into a map, and then place work directly onto this queue during parsing of all segments. We can submit a runnable immediately for processing this queue to represent a thread. We have a potential problem here, though, which is that we do not know if a thread died, so we can fill up the executor pool, so we for now let's use an unbounded executorpool and leave tackling this properly until we have everything else in place. We should then limit how many queries ahead we can read to prevent OOM.
Also, we're still replaying based on offset from last query, which means we will skew very quickly. We should be fixing an epoch (in nanos) such that you have a log epoch of L, and queries are run at T=L+X; when re-run we have a replay epoch of R, and we run queries at R+X