import asyncio
import io
from typing importAny, ClassVar, IO, MutableMapping
from aiopath import PureAsyncPosixPath
from pyrogram.types import CallbackQuery, Message
from anjani import command, filters, listener, plugin, util
classExamplePlugin(plugin.Plugin): name: ClassVar[str] = "Example" disabled: ClassVar[bool] = True helpable: ClassVar[bool] = False
db: util.db.AsyncCollection
asyncdefon_load(self) -> None: self.db = self.bot.db.get_collection("example")
@listener.filters(filters.group)asyncdefon_message(self, message: Message) -> None: self.log.info(f"Received message: {message.text}")
await self.db.update_one(
{"_id": message.id}, {"$set": {"text": message.text}}, upsert=True )
@listener.filters(filters.regex("^/example$"))asyncdefon_callback_query(self, query: CallbackQuery) -> None: self.log.info("Button clicked: %s", query.data)
await query.answer("You clicked the button!")
asyncdefon_chat_action(self, message: Message) -> None:if message.new_chat_members:
for new_member in message.new_chat_members:
self.log.info("New member joined: %s", new_member.first_name)
else:
left_member = message.left_chat_member
self.log.info("A member just left chat: %s", left_member.first_name)
asyncdefon_chat_migrate(self, message: Message) -> None: self.log.info("Migrating chat...")
new_chat = message.chat.id old_chat = message.migrate_from_chat_id
await self.db.update_one(
{"chat_id": old_chat},
{"$set": {"chat_id": new_chat}},
)
asyncdefon_plugin_backup(self, chat_id: int) -> MutableMapping[str, Any]:"""Dispatched when /backup command is Called""" self.log.info("Backing up data plugin: %s", self.name)
data = await self.db.find_one({"chat_id": chat_id}, {"_id": False})
return {self.name: data} if data else {}
asyncdefon_plugin_restore(self, chat_id: int, data: MutableMapping[str, Any]) -> None:"""Dispatched when /restore command is Called""" self.log.info("Restoring data plugin: %s", self.name)
await self.db.update_one({"chat_id": chat_id},
{"$set": data[self.name]},
upsert=True)
asyncdefcmd_test(self, ctx: command.Context) -> str:await ctx.respond("Processing...")
await asyncio.sleep(1)
if ctx.input:
return ctx.inputreturn"It works!"asyncdefget_cat(self) -> IO[bytes]:# Get the link to a random cat pictureasyncwith self.bot.http.get("https://aws.random.cat/meow") as resp:
# Read and parse the response as JSON json = await resp.json()
# Get the "file" field from the parsed JSON object cat_url = json["file"]
# Get the actual cat pictureasyncwith self.bot.http.get(cat_url) as resp:
# Get the data as a byte array (bytes object) cat_data = await resp.read()
# Construct a byte stream from the data.# This is necessary because the bytes object is immutable, but we need to add a "name" attribute to set the# filename. This facilitates the setting of said attribute without altering behavior. cat_stream = io.BytesIO(cat_data)
# Set the name of the cat picture before sending.# This is necessary for Pyrogram to detect the file type and send it as a photo/GIF rather than just a plain# unnamed file that doesn't render as media in clients.# We abuse aiopath to extract the filename section here for convenience, since URLs are *mostly* POSIX paths# with the exception of the protocol part, which we don't care about here. cat_stream.name = PureAsyncPosixPath(cat_url).name
return cat_stream
@command.filters(filters.admin_only)asyncdefcmd_cat(self, ctx: command.Context) -> None:await ctx.respond("Fetching cat...")
cat_stream = await self.get_cat()
await ctx.respond(photo=cat_stream)