Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env groovy
- import groovy.cli.picocli.CliBuilder
- import io.lettuce.core.RedisClient
- import org.apache.logging.log4j.Level
- import org.apache.logging.log4j.LogManager
- import org.apache.logging.log4j.core.config.Configurator
- import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory
- import reactor.core.publisher.Flux
- import reactor.core.publisher.Mono
- import reactor.core.scheduler.Schedulers
- import reactor.util.function.Tuples
- import java.time.Duration
- import java.time.Instant
- import java.util.concurrent.atomic.AtomicBoolean
- @GrabConfig(systemClassLoader = true)
- @Grab('org.apache.logging.log4j:log4j-api:2.19.0')
- @Grab('org.apache.logging.log4j:log4j-core:2.19.0')
- @Grab('org.apache.logging.log4j:log4j-slf4j-impl:2.19.0')
- @Grab('org.codehaus.groovy:groovy-cli-picocli:3.0.13')
- @Grab('info.picocli:picocli:4.6.3')
- @Grab('io.lettuce:lettuce-core:6.2.1.RELEASE')
- // cli definition
- def cli = new CliBuilder(name: 'redis-throughput-test')
- cli.parser.caseInsensitiveEnumValuesAllowed(true)
- cli.usageMessage.sortOptions(false)
- cli.usageMessage.autoWidth(true)
- cli.usageMessage.width(120)
- cli.h(longOpt: 'help', 'show usage')
- cli.u(longOpt: 'redis-url', args: 1, argName: 'url', required: true, type: String,
- defaultValue: 'redis://localhost:6379',
- 'redis URL (default: redis://localhost:6379)')
- cli.s(longOpt: 'request-size', args: 1, argName: 'size', type: Integer, defaultValue: '51200',
- 'size of redis request (default: 51200 - 50kb)')
- cli.p(longOpt: 'parallelism', args: 1, argName: 'num', type: Integer, defaultValue: '1',
- 'number of parallel redis requests (default: 1)')
- cli.v(longOpt: 'v', args: '0', 'verbose output')
- cli.vv(longOpt: 'vv', args: '0', 'more verbose output')
- // parse cli args
- def cliOpts = cli.parse(args)
- if (!cliOpts) {
- return
- }
- if (cliOpts.h) {
- cli.usage()
- return
- }
- // logger configuration
- def logCfg = ConfigurationBuilderFactory.newConfigurationBuilder()
- def logConsoleAppender = logCfg.newAppender('console', 'Console')
- def logPatternLayout = logCfg.newLayout('PatternLayout')
- def logLevel = cliOpts.vv ? Level.TRACE
- : cliOpts.v ? Level.DEBUG
- : Level.INFO
- logPatternLayout.addAttribute('pattern', logLevel <= Level.INFO ? '%msg%n' : '%d %msg%n%throwable')
- logConsoleAppender.add(logPatternLayout)
- logCfg.add(logConsoleAppender)
- def rootLogger = logCfg.newRootLogger(Level.ERROR)
- rootLogger.add(logCfg.newAppenderRef('console'))
- logCfg.add(rootLogger)
- def scriptLogger = logCfg.newLogger('script', logLevel)
- scriptLogger.add(logCfg.newAppenderRef('console'))
- scriptLogger.addAttribute('additivity', false)
- logCfg.add(scriptLogger)
- def logCtx = Configurator.initialize(LogManager.classLoader, logCfg.build())
- log = logCtx.getLogger('script')
- // validate cli args
- def requestSize = cliOpts['request-size']
- def redisUrl = cliOpts['redis-url']
- if (requestSize < 1) {
- log.error("request-size must be greater than 0")
- System.exit(1)
- }
- def parallelism = cliOpts['parallelism']
- if (parallelism < 1) {
- log.error("parallelism must be greater than 0")
- System.exit(1)
- }
- // print all args
- if (log.debugEnabled) {
- log.debug 'arguments:'
- printMapEntriesIndented([
- logLevel : logLevel,
- redisUrl : redisUrl,
- requestSize: requestSize,
- parallelism: parallelism
- ], Level.DEBUG)
- log.debug ''
- }
- // *****************************************************************************
- // main logic
- def reqSizeKb = requestSize / 1024
- def isCancelled = new AtomicBoolean(false)
- RedisClient.create(redisUrl).withCloseable { RedisClient client ->
- def connect = { client.connect() }
- def disconnect = { it.close() }
- def sub = Flux
- .using(connect, { plainConn ->
- def redis = plainConn.reactive()
- def data = 'x'.multiply(requestSize)
- Flux.range(0, parallelism * 100).repeat()
- .publishOn(Schedulers.boundedElastic())
- .flatMap({ idx ->
- Mono.defer { Mono.just(Instant.now()) }
- .flatMap { reqStartedAt ->
- redis.setex("test-key-${idx}".toString(), 60, data)
- .then(redis.get("test-key-${idx}".toString()))
- .map {
- Duration.between(reqStartedAt, Instant.now())
- }
- }
- }, parallelism)
- .scan(Tuples.of(0 as Long, Duration.ZERO), { acc, duration ->
- Tuples.of(acc.t1 + 2, acc.t2 + duration)
- })
- .skip(1)
- }, disconnect)
- .takeUntil { isCancelled.get() }
- .subscribe {
- def totalRequests = it.t1
- if (totalRequests % 2000 == 0) {
- def totalDuration = it.t2
- def avgLatency = totalDuration.dividedBy(totalRequests)
- def avgThroughput = (totalRequests * reqSizeKb / totalDuration.toNanos()) * 1000000000
- log.info 'avg latency : {}', avgLatency
- log.info 'avg throughput (kb/s) : {}', avgThroughput
- }
- }
- log.info '<press enter to stop test>'
- sleep 2_000
- def console = System.console()
- 2.times { log.info '' }
- console.readLine()
- isCancelled.set(true)
- while (!sub.disposed) {
- sleep 10
- }
- }
- log.info 'done'
- // *******************************************************************************
- // functions
- def printMapEntriesIndented(Map map, Level logLevel = Level.INFO, def indentSymbol = ' ', def indentNum = 2) {
- def maxKeySize = map.keySet()
- .collect { it.toString().size() }
- .max()
- map.each { k, v ->
- log.log logLevel, "${indentSymbol * indentNum}${k.toString().padRight(maxKeySize)} : ${v}"
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement