Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- import disnake
- from disnake import ApplicationCommandInteraction as Interaction
- from disnake.ext import commands
- import lavalink
- import logging
- # Configure logging
- logging.basicConfig(level=logging.DEBUG)
- url_rx = re.compile(r'https?://(?:www\.)?.+')
- CHOICE = commands.option_enum(["Youtube", "Spotify", "Soundcloud"])
- class LavalinkVoiceClient(disnake.VoiceProtocol):
- def __init__(self, client: disnake.Client, channel: disnake.abc.Connectable):
- self.client = client
- self.channel = channel
- self.guild_id = channel.guild.id
- self._destroyed = False
- if not hasattr(self.client, 'lavalink'):
- self.client.lavalink = lavalink.Client(client.user.id)
- self.client.lavalink.add_node(
- host='localhost',
- port=2333,
- password='youshallnotpass',
- region='us',
- name='default-node'
- )
- self.lavalink = self.client.lavalink
- async def on_voice_server_update(self, data):
- lavalink_data = {'t': 'VOICE_SERVER_UPDATE', 'd': data}
- await self.lavalink.voice_update_handler(lavalink_data)
- async def on_voice_state_update(self, data):
- channel_id = data['channel_id']
- if not channel_id:
- await self._destroy()
- return
- self.channel = self.client.get_channel(int(channel_id))
- lavalink_data = {'t': 'VOICE_STATE_UPDATE', 'd': data}
- await self.lavalink.voice_update_handler(lavalink_data)
- async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False) -> None:
- self.lavalink.player_manager.create(guild_id=self.channel.guild.id)
- await self.channel.guild.change_voice_state(
- channel=self.channel,
- self_mute=self_mute,
- self_deaf=self_deaf
- )
- async def disconnect(self, *, force: bool = False) -> None:
- player = self.lavalink.player_manager.get(self.channel.guild.id)
- if not force and not player.is_connected:
- return
- await self.channel.guild.change_voice_state(channel=None)
- player.channel_id = None
- await self._destroy()
- async def _destroy(self):
- if self._destroyed:
- return
- self._destroyed = True
- try:
- await self.lavalink.player_manager.destroy(self.guild_id)
- except lavalink.ClientError as e:
- logger.error(f'Error destroying Lavalink player: {e}')
- class Music(commands.Cog):
- def __init__(self, bot):
- self.bot = bot
- @commands.Cog.listener()
- async def on_ready(self):
- if not hasattr(self.bot, 'lavalink'):
- self.bot.lavalink = lavalink.Client(self.bot.user.id)
- self.bot.lavalink.add_node(
- host='localhost',
- port=2333,
- password='youshallnotpass',
- region='us',
- name='default-node'
- )
- async def create_player(self, inter):
- if inter.guild is None:
- await inter.response.send_message("This command can only be used in a server.")
- return
- player = self.bot.lavalink.player_manager.create(inter.guild.id)
- should_connect = inter.data['name'] in ('play',)
- voice_client = inter.guild.voice_client
- if not inter.author.voice or not inter.author.voice.channel:
- await inter.response.send_message('You need to join a voice channel first.')
- return
- voice_channel = inter.author.voice.channel
- if voice_client is None:
- if not should_connect:
- await inter.response.send_message("I'm not playing music.")
- return
- permissions = voice_channel.permissions_for(inter.guild.me)
- if not permissions.connect or not permissions.speak:
- await inter.response.send_message('I need the `CONNECT` and `SPEAK` permissions.')
- return
- if voice_channel.user_limit > 0:
- if len(voice_channel.members) >= voice_channel.user_limit and not inter.guild.me.guild_permissions.move_members:
- raise commands.CommandInvokeError('Your voice channel is full!')
- player.store('channel', inter.channel.id)
- # Automatically join the voice channel if the bot is not already connected
- await voice_channel.connect(cls=LavalinkVoiceClient)
- elif voice_client.channel.id != voice_channel.id:
- # If the bot is connected to another channel, raise an error
- raise commands.CommandInvokeError('You need to be in the same voice channel as the bot.')
- return True
- @commands.slash_command(name='play', description='Searches and plays a song from a given query.')
- async def play(self, inter, query: str, source: CHOICE = None):
- await self.create_player(inter)
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- # print(player)
- query = query.strip('<>')
- # print(source)
- if not url_rx.match(query):
- if source == 'Spotify':
- query = f'sprec:{query}'
- elif source == 'Soundcloud':
- query = f'scsearch:{query}'
- elif source == 'Youtube':
- query = f'ytsearch:{query}'
- else:
- query = f'ytsearch:{query}'
- # print(query)
- results = await player.node.get_tracks(query)
- embed = disnake.Embed(color=disnake.Color.blurple())
- if results.load_type == lavalink.LoadType.EMPTY:
- return await inter.response.send_message("I couldn't find any tracks for that query.")
- elif results.load_type == lavalink.LoadType.PLAYLIST:
- tracks = results.tracks
- for track in tracks:
- player.add(track=track, requester=inter.author.id)
- embed.title = 'Playlist Enqueued!'
- embed.description = f'{results.playlist_info.name} - {len(tracks)} tracks'
- else:
- track = results.tracks[0]
- embed.title = 'Track Enqueued'
- embed.description = f'[{track.title}]({track.uri})'
- player.add(track=track, requester=inter.author.id)
- await inter.response.send_message(embed=embed)
- if not player.is_playing:
- await player.play()
- @commands.slash_command(name='stop', description='Stops the player and clears the queue.')
- async def stop(self, inter):
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- if not player.is_connected:
- return await inter.response.send_message("I'm not connected to a voice channel.")
- player.queue.clear()
- # Stop the current track so Lavalink consumes less resources.
- await player.stop()
- # Disconnect from the voice channel.
- await inter.guild.voice_client.disconnect(force=True)
- # await inter.guild.voice_client.disconnect(force=True)
- await inter.response.send_message("Stopped the player and cleared the queue.")
- @commands.slash_command(name='skip', description='Skips the current track.')
- async def skip(self, inter: Interaction):
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- if not player.is_connected:
- return await inter.response.send_message("I'm not connected to a voice channel.")
- if not player.is_playing:
- return await inter.response.send_message("There's no track playing right now.")
- await player.skip()
- await inter.response.send_message("Skipped the current track.")
- @commands.slash_command(name='join', description='Joins the voice channel you are in.')
- async def join(self, inter):
- await self.create_player(inter)
- await inter.response.send_message("Joined the voice channel.")
- @commands.slash_command(name='volume', description='Sets the player volume.')
- async def volume(self, inter: Interaction, volume: int):
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- if not player.is_connected:
- return await inter.response.send_message("I'm not connected to a voice channel.")
- if volume < 0 or volume > 1000:
- return await inter.response.send_message("Volume must be between 0 and 100.")
- await player.set_volume(volume)
- await inter.response.send_message(f"Set the player volume to {volume}%.")
- @commands.slash_command(name='pause', description='Pauses the current track.')
- async def pause(self, inter: Interaction):
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- if not player.is_connected:
- return await inter.response.send_message("I'm not connected to a voice channel.")
- if not player.is_playing:
- return await inter.response.send_message("There's no track playing right now.")
- await player.set_pause(True)
- await inter.response.send_message("Paused the current track.")
- @commands.slash_command(name='resume', description='Resumes the current track.')
- async def continue_(self, inter: Interaction):
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- if not player.is_connected:
- return await inter.response.send_message("I'm not connected to a voice channel.")
- if not player.is_playing:
- return await inter.response.send_message("There's no track playing right now.")
- await player.set_pause(False)
- await inter.response.send_message("Resumed the current track.")
- @commands.slash_command(name='loop', description='Toggles looping the current track.')
- async def loop(self, inter: Interaction):
- player = self.bot.lavalink.player_manager.get(inter.guild.id)
- if not player.is_connected:
- return await inter.response.send_message("I'm not connected to a voice channel.")
- if not player.is_playing:
- return await inter.response.send_message("There's no track playing right now.")
- player.set_loop(not player.loop)
- loop_state = "enabled" if player.loop else "disabled"
- await inter.response.send_message(f"Looping is now {loop_state}.")
- def setup(bot):
- bot.add_cog(Music(bot))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement