Advertisement
dburyak

Untitled

Nov 17th, 2022
2,449
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Groovy 6.14 KB | None | 0 0
  1. #!/usr/bin/env groovy
  2. import groovy.cli.picocli.CliBuilder
  3. import io.lettuce.core.RedisClient
  4. import org.apache.logging.log4j.Level
  5. import org.apache.logging.log4j.LogManager
  6. import org.apache.logging.log4j.core.config.Configurator
  7. import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory
  8. import reactor.core.publisher.Flux
  9. import reactor.core.publisher.Mono
  10. import reactor.core.scheduler.Schedulers
  11. import reactor.util.function.Tuples
  12.  
  13. import java.time.Duration
  14. import java.time.Instant
  15. import java.util.concurrent.atomic.AtomicBoolean
  16.  
  17. @GrabConfig(systemClassLoader = true)
  18. @Grab('org.apache.logging.log4j:log4j-api:2.19.0')
  19. @Grab('org.apache.logging.log4j:log4j-core:2.19.0')
  20. @Grab('org.apache.logging.log4j:log4j-slf4j-impl:2.19.0')
  21. @Grab('org.codehaus.groovy:groovy-cli-picocli:3.0.13')
  22. @Grab('info.picocli:picocli:4.6.3')
  23. @Grab('io.lettuce:lettuce-core:6.2.1.RELEASE')
  24.  
  25.  
  26. // cli definition
  27. def cli = new CliBuilder(name: 'redis-throughput-test')
  28. cli.parser.caseInsensitiveEnumValuesAllowed(true)
  29. cli.usageMessage.sortOptions(false)
  30. cli.usageMessage.autoWidth(true)
  31. cli.usageMessage.width(120)
  32. cli.h(longOpt: 'help', 'show usage')
  33. cli.u(longOpt: 'redis-url', args: 1, argName: 'url', required: true, type: String,
  34.         defaultValue: 'redis://localhost:6379',
  35.         'redis URL (default: redis://localhost:6379)')
  36. cli.s(longOpt: 'request-size', args: 1, argName: 'size', type: Integer, defaultValue: '51200',
  37.         'size of redis request (default: 51200 - 50kb)')
  38. cli.p(longOpt: 'parallelism', args: 1, argName: 'num', type: Integer, defaultValue: '1',
  39.         'number of parallel redis requests (default: 1)')
  40. cli.v(longOpt: 'v', args: '0', 'verbose output')
  41. cli.vv(longOpt: 'vv', args: '0', 'more verbose output')
  42.  
  43.  
  44. // parse cli args
  45. def cliOpts = cli.parse(args)
  46. if (!cliOpts) {
  47.     return
  48. }
  49. if (cliOpts.h) {
  50.     cli.usage()
  51.     return
  52. }
  53.  
  54.  
  55. // logger configuration
  56. def logCfg = ConfigurationBuilderFactory.newConfigurationBuilder()
  57. def logConsoleAppender = logCfg.newAppender('console', 'Console')
  58. def logPatternLayout = logCfg.newLayout('PatternLayout')
  59. def logLevel = cliOpts.vv ? Level.TRACE
  60.         : cliOpts.v ? Level.DEBUG
  61.         : Level.INFO
  62. logPatternLayout.addAttribute('pattern', logLevel <= Level.INFO ? '%msg%n' : '%d %msg%n%throwable')
  63. logConsoleAppender.add(logPatternLayout)
  64. logCfg.add(logConsoleAppender)
  65. def rootLogger = logCfg.newRootLogger(Level.ERROR)
  66. rootLogger.add(logCfg.newAppenderRef('console'))
  67. logCfg.add(rootLogger)
  68. def scriptLogger = logCfg.newLogger('script', logLevel)
  69. scriptLogger.add(logCfg.newAppenderRef('console'))
  70. scriptLogger.addAttribute('additivity', false)
  71. logCfg.add(scriptLogger)
  72. def logCtx = Configurator.initialize(LogManager.classLoader, logCfg.build())
  73. log = logCtx.getLogger('script')
  74.  
  75.  
  76. // validate cli args
  77. def requestSize = cliOpts['request-size']
  78. def redisUrl = cliOpts['redis-url']
  79. if (requestSize < 1) {
  80.     log.error("request-size must be greater than 0")
  81.     System.exit(1)
  82. }
  83. def parallelism = cliOpts['parallelism']
  84. if (parallelism < 1) {
  85.     log.error("parallelism must be greater than 0")
  86.     System.exit(1)
  87. }
  88.  
  89.  
  90. // print all args
  91. if (log.debugEnabled) {
  92.     log.debug 'arguments:'
  93.     printMapEntriesIndented([
  94.             logLevel   : logLevel,
  95.             redisUrl   : redisUrl,
  96.             requestSize: requestSize,
  97.             parallelism: parallelism
  98.     ], Level.DEBUG)
  99.     log.debug ''
  100. }
  101.  
  102.  
  103. // *****************************************************************************
  104. // main logic
  105. def reqSizeKb = requestSize / 1024
  106. def isCancelled = new AtomicBoolean(false)
  107. RedisClient.create(redisUrl).withCloseable { RedisClient client ->
  108.     def connect = { client.connect() }
  109.     def disconnect = { it.close() }
  110.     def sub = Flux
  111.             .using(connect, { plainConn ->
  112.                 def redis = plainConn.reactive()
  113.                 def data = 'x'.multiply(requestSize)
  114.                 Flux.range(0, parallelism * 100).repeat()
  115.                         .publishOn(Schedulers.boundedElastic())
  116.                         .flatMap({ idx ->
  117.                             Mono.defer { Mono.just(Instant.now()) }
  118.                                     .flatMap { reqStartedAt ->
  119.                                         redis.setex("test-key-${idx}".toString(), 60, data)
  120.                                                 .then(redis.get("test-key-${idx}".toString()))
  121.                                                 .map {
  122.                                                     Duration.between(reqStartedAt, Instant.now())
  123.                                                 }
  124.                                     }
  125.                         }, parallelism)
  126.                         .scan(Tuples.of(0 as Long, Duration.ZERO), { acc, duration ->
  127.                             Tuples.of(acc.t1 + 2, acc.t2 + duration)
  128.                         })
  129.                         .skip(1)
  130.             }, disconnect)
  131.             .takeUntil { isCancelled.get() }
  132.             .subscribe {
  133.                 def totalRequests = it.t1
  134.                 if (totalRequests % 2000 == 0) {
  135.                     def totalDuration = it.t2
  136.                     def avgLatency = totalDuration.dividedBy(totalRequests)
  137.                     def avgThroughput = (totalRequests * reqSizeKb / totalDuration.toNanos()) * 1000000000
  138.                     log.info 'avg latency           : {}', avgLatency
  139.                     log.info 'avg throughput (kb/s) : {}', avgThroughput
  140.                 }
  141.             }
  142.     log.info '<press enter to stop test>'
  143.     sleep 2_000
  144.     def console = System.console()
  145.     2.times { log.info '' }
  146.     console.readLine()
  147.     isCancelled.set(true)
  148.     while (!sub.disposed) {
  149.         sleep 10
  150.     }
  151. }
  152.  
  153. log.info 'done'
  154.  
  155.  
  156. // *******************************************************************************
  157. // functions
  158. def printMapEntriesIndented(Map map, Level logLevel = Level.INFO, def indentSymbol = ' ', def indentNum = 2) {
  159.     def maxKeySize = map.keySet()
  160.             .collect { it.toString().size() }
  161.             .max()
  162.     map.each { k, v ->
  163.         log.log logLevel, "${indentSymbol * indentNum}${k.toString().padRight(maxKeySize)} : ${v}"
  164.     }
  165. }
  166.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement